ifw-daq  3.0.1
IFW Data Acquisition modules
main.cpp
Go to the documentation of this file.
1 /**
2  * @file
3  * @ingroup server
4  * @copyright ESO - European Southern Observatory
5  * @author
6  *
7  * @brief main source file.
8  */
9 
10 #include "actionMgr.hpp"
11 #include "actionsStd.hpp"
12 #include "dataContext.hpp"
13 #include "dbInterface.hpp"
14 #include "logger.hpp"
15 #include "ocmDaqService.hpp"
16 #include "stdCmdsImpl.hpp"
17 
18 #include <Stdif.hpp>
19 #include <events.rad.hpp>
20 
21 #include <ciiLogManager.hpp>
22 #include <rad/actionCallback.hpp>
23 #include <rad/exceptions.hpp>
24 #include <rad/logger.hpp>
25 #include <rad/mal/publisher.hpp>
26 #include <rad/mal/replier.hpp>
27 #include <rad/mal/utils.hpp>
28 #include <rad/smAdapter.hpp>
29 
30 #include <scxml4cpp/Context.h>
31 #include <scxml4cpp/EventQueue.h>
32 
33 #include <boost/asio.hpp>
34 #include <boost/exception/diagnostic_information.hpp>
35 #include <boost/filesystem.hpp>
36 #include <log4cplus/hierarchy.h>
37 
38 #include <fmt/format.h>
39 
40 #include <functional>
41 #include <memory>
42 
43 #include <daq/conversion.hpp>
44 #include <daq/daqController.hpp>
45 #include <daq/dpmClient.hpp>
46 #include <daq/error/report.hpp>
47 #include <daq/manager.hpp>
48 #include <daq/workspace.hpp>
49 #include <daqif/uri.hpp>
50 
51 class DaqService {
52 public:
53  DaqService(std::string name,
54  daq::ManagerParams manager_params,
55  std::filesystem::path const& workspace,
56  std::string const& output_path,
57  rad::IoExecutor& executor,
58  std::shared_ptr<mal::Mal> mal,
59  rad::cii::Replier& replier,
60  rad::cii::Publisher<daqif::DaqStatus>& publisher,
61  rad::SMAdapter& state_machine,
62  daq::DpmClientParams const& dpm_params)
63  : m_name(std::move(name))
64  , m_workspace(workspace)
65  , m_mal(std::move(mal))
66  , m_event_log(std::make_shared<daq::ObservableEventLog>())
67  , m_dpm_client(
68  std::make_shared<daq::DpmClientImpl>(executor.get_io_context(), *m_mal, dpm_params))
69  , m_daq_factory(executor.get_io_context(), *m_mal, m_dpm_client)
70  , m_manager(executor,
71  std::move(manager_params),
72  m_workspace,
73  m_event_log,
74  m_daq_factory,
75  m_dpm_client,
76  log4cplus::Logger::getInstance(server::LOGGER_NAME_MANAGER))
77  , m_service(std::make_shared<OcmDaqService>(
78  executor.get_io_context(), *m_mal, m_manager, m_name, output_path, m_event_log))
79  , m_replier(replier)
80  , m_publisher(publisher)
81  , m_state_machine(state_machine)
82  , m_is_active(false) {
83  m_sample = m_publisher.CreateTopic();
84  m_connection = m_manager.GetStatusSignal().ConnectObserver(
85  [this](auto const& status) { this->DaqStatusUpdate(status); });
86 
87  // Try restoring from workspace
88  try {
89  m_manager.RestoreFromWorkspace();
90  } catch (...) {
91  daq::error::NestedExceptionReporter r(std::current_exception());
92  LOG4CPLUS_ERROR(server::GetLogger(),
93  "Failed to restore state from OCM workspace at "
94  << m_workspace.GetPath().native() << ":\n"
95  << r);
96  }
97  }
98 
99  void Register() {
100  LOG4CPLUS_DEBUG(server::GetLogger(), "RegisterService");
101  m_replier.RegisterService(m_name,
102  std::static_pointer_cast<daqif::AsyncOcmDaqControl>(m_service));
103  }
104  void Unregister() {
105  LOG4CPLUS_DEBUG(server::GetLogger(), "UnregisterService");
106  m_replier.UnregisterService(m_name);
107  }
108 
109  /**
110  * Is notified of any DAQ status change and will post events to SM on DAQ activity flank
111  * changes.
112  *
113  * @note Current implementation assumes all DAQs begin in state NotStarted. When ocm supports
114  * loading old status from persistent storage this will no longer work as number of active daqs
115  * is not starting at zero and state is not starting at NotStarted.
116  */
118  try {
119  LOG4CPLUS_DEBUG(server::GetLogger(),
120  fmt::format("Publishing DAQ status for DAQ {}", status.GetId()));
121  auto state = status.GetState();
122  if (state == daq::State::NotStarted && !m_is_active) {
123  // New data acquisition
124  m_is_active = true;
125  LOG4CPLUS_DEBUG(server::GetLogger(), "Calling ProcessEvent");
126  m_state_machine.ProcessEvent(Events::AnyDaqActive{});
127  LOG4CPLUS_DEBUG(server::GetLogger(), "Calling ProcessEvent done");
128  } else if (m_is_active && daq::IsFinalState(status.GetState())) {
129  // At least one DAQ is completed. Check if all DAQs are completed.
130  auto daqs = m_manager.GetDaqControllers();
131  auto all_final =
132  std::all_of(daqs.begin(), daqs.end(), [](auto const shrd_daq) -> bool {
133  assert(shrd_daq);
134  return daq::IsFinalState(shrd_daq->GetState());
135  });
136  if (all_final) {
137  m_is_active = false;
138  // Active -> Idle
139  LOG4CPLUS_DEBUG(server::GetLogger(), "Calling ProcessEvent");
140  m_state_machine.ProcessEvent(Events::AllDaqInactive{});
141  LOG4CPLUS_DEBUG(server::GetLogger(), "Calling ProcessEvent done");
142  }
143  }
144  // publish state
145  *m_sample << status.GetStatus();
146  m_publisher.Publish(*m_sample);
147  LOG4CPLUS_DEBUG(server::GetLogger(),
148  fmt::format("Publishing DAQ status for DAQ {} done", status.GetId()));
149  } catch (std::exception const& e) {
150  LOG4CPLUS_ERROR(server::GetLogger(),
151  fmt::format("Failed to publish status: {}", e.what()));
152  }
153  }
154 
155 private:
156  std::string m_name;
157  daq::WorkspaceImpl m_workspace;
158  std::shared_ptr<mal::Mal> m_mal;
159  std::shared_ptr<daq::ObservableEventLog> m_event_log;
160  std::shared_ptr<daq::DpmClientImpl> m_dpm_client;
161  daq::DaqControllerFactoryDefault m_daq_factory;
162  daq::ManagerImpl m_manager;
163  std::shared_ptr<OcmDaqService> m_service;
164  rad::cii::Replier& m_replier;
165  rad::cii::Publisher<daqif::DaqStatus>& m_publisher;
166  rad::SMAdapter& m_state_machine;
167  bool m_is_active;
168 
169  boost::signals2::connection m_connection;
170  std::shared_ptr<daqif::DaqStatus> m_sample;
171 };
172 
173 /**
174  * Application main.
175  *
176  * @param[in] argc Number of command line options.
177  * @param[in] argv Command line options.
178  */
179 int main(int argc, char* argv[]) {
180  namespace fs = boost::filesystem;
181 
182  auto log_initializer = rad::LogInitializer();
183  LOG4CPLUS_INFO(server::GetLogger(), "Application ocm-server started.");
184  try {
185  /* Read only configuration */
186  server::Config config;
187  if (config.ParseOptions(argc, argv) == false) {
188  // request for help
189  return EXIT_SUCCESS;
190  }
191  /*
192  * Load CII/MAL middleware here because it resets
193  * the log4cplus configuration!
194  */
195  elt::mal::CiiFactory& factory = elt::mal::CiiFactory::getInstance();
196  auto mal = elt::mal::loadMal("zpb", {});
197  factory.registerMal("zpb", mal);
198 
199  config.LoadConfig();
200 
201  elt::log::CiiLogManager::Configure(
202  rad::GetDefaultLogProperties(config.GetProcName() + ".log", config.GetLogLevel()));
203  if (auto const& file = config.GetLogProperties(); !file.empty()) {
204  auto resolved = rad::Helper::FindFile(config.GetLogProperties());
205  if (resolved.empty()) {
206  LOG4CPLUS_ERROR(server::GetLogger(),
207  "Configured log property file not found: '"
208  << file << "', $CFGPATH=" << rad::Helper::GetEnvVar("CFGPATH"));
209  return EXIT_FAILURE;
210  }
211  elt::log::CiiLogManager::Configure(resolved);
212  }
213 
214  if (config.m_out_path.empty()) {
215  auto const msg =
216  "Output path not configured. Define environment "
217  "variable $DATAROOT or configuration " +
219  LOG4CPLUS_ERROR(server::GetLogger(), msg);
220  std::cerr << msg << std::endl;
221  return -1;
222  }
223  // Create out path if necessary with permissive permissions
224  if (!fs::is_directory(config.m_out_path)) {
225  LOG4CPLUS_INFO(server::GetLogger(),
226  fmt::format("Output dir '{}' is not a directory. Creating it now",
227  config.m_out_path));
228  fs::create_directories(config.m_out_path);
229  fs::permissions(config.m_out_path, fs::others_read | fs::owner_all | fs::group_all);
230  }
231 
232  auto workspace_root = config.GetWorkspace();
233  LOG4CPLUS_INFO(server::GetLogger(),
234  fmt::format("Using workspace directory: {}", workspace_root.native()));
235 
236  /*
237  * LAN 2020-07-09 EICSSW-717
238  * Create CII/MAL replier as soon as possible to avoid problems when
239  * an exceptions is thrown from and Action/Guard.
240  */
241  rad::cii::Replier mal_replier(daqif::MakeServerUri(config.GetMsgReplierEndpoint()));
242  auto std_status_publisher = std::make_unique<rad::cii::Publisher<stdif::Status>>(
243  daqif::MakeServiceUri(config.GetPubEndpoint(), "std/status"));
244  auto daq_status_publisher = std::make_unique<rad::cii::Publisher<daqif::DaqStatus>>(
245  daqif::MakeServiceUri(config.GetPubEndpoint(), "daq/status"));
246 
247  /* Runtime DB */
248  rad::OldbAsyncWriter oldb_writer(config.GetDbTimeout(), std::chrono::milliseconds(100));
249 
250  /* Runtime data context */
251  server::DataContext data_ctx(config, oldb_writer);
252  data_ctx.UpdateDb();
253 
254  /*
255  * Create event loop
256  */
257 
258  // event loop
259  boost::asio::io_context io_ctx;
260  rad::IoExecutor executor(io_ctx);
261 
262  /*
263  * State Machine related objects
264  */
265  // SM event queue and context
266  scxml4cpp::EventQueue external_events;
267  scxml4cpp::Context state_machine_ctx;
268 
269  // State Machine facade
270  rad::SMAdapter state_machine(io_ctx, &state_machine_ctx, external_events);
271 
272  daq::ManagerParams manager_params;
273  manager_params.instrument_id = config.m_instrument_id;
274  manager_params.acquiring_stale_age = config.m_stale_acquiring;
275  manager_params.merging_stale_age = config.m_stale_merging;
276 
277  // @todo: Load DPM RR URI and timeout
278  DaqService daq_service("daq",
279  manager_params,
280  workspace_root,
281  config.m_out_path,
282  executor,
283  mal,
284  mal_replier,
285  *daq_status_publisher,
286  state_machine,
287  config.GetDpmClientParams());
288 
289  // actions and activities
290  server::ActionMgr action_mgr;
291  action_mgr.CreateActions(io_ctx, state_machine, data_ctx);
292  action_mgr.CreateActivities(state_machine, data_ctx);
293  action_mgr.AddAction(new rad::ActionCallback("OcmDaq.RegisterService",
294  [&](auto) { daq_service.Register(); }));
295  action_mgr.AddAction(new rad::ActionCallback("OcmDaq.UnregisterService",
296  [&](auto) { daq_service.Unregister(); }));
297 
298  // Load SM model
299  state_machine.Load(
300  config.GetSmScxmlFilename(), &action_mgr.GetActions(), &action_mgr.GetActivities());
301 
302  // Register handlers to reject events
303  state_machine.RegisterDefaultRequestRejectHandler<Events::Init>();
304  state_machine.RegisterDefaultRequestRejectHandler<Events::Enable>();
305  state_machine.RegisterDefaultRequestRejectHandler<Events::Disable>();
306 
307  // Register publisher to export state information
308  state_machine.SetStatusPublisher([&](std::string const& status) {
309  auto ics_status = server::MakeStatusString(state_machine.GetActiveStates())
310  .value_or("NotOperational;Undefined");
311 
312  auto sample = std_status_publisher->CreateTopic();
313  sample->setStatus(ics_status);
314  sample->setSource(config.GetProcName());
315  std_status_publisher->Publish(*sample);
316  data_ctx.GetDbInterface().SetControlState(status);
317  });
318  /*
319  * Register CII/MAL replier
320  */
321  mal_replier.RegisterService<stdif::AsyncStdCmds>(
322  "std", std::make_shared<server::StdCmdsImpl>(state_machine));
323 
324  /*
325  * Start event loop
326  */
327  state_machine.Start();
328  oldb_writer.StartWriter();
329  try {
330  io_ctx.run();
331  } catch (std::exception const& e) {
332  LOG4CPLUS_ERROR(server::GetLogger(), "Exception from asio context: " << e.what());
333  }
334  state_machine.Stop();
335  } catch (rad::Exception& e) {
336  LOG4CPLUS_ERROR(server::GetLogger(), e.what());
337 
338  std::cout << "Exit main()\n";
339  return EXIT_FAILURE;
340  } catch (...) {
341  LOG4CPLUS_ERROR(server::GetLogger(), boost::current_exception_diagnostic_information());
342  std::cout << "Exit main()\n";
343  return EXIT_FAILURE;
344  }
345 
346  // to avoid valgrind warnings on potential memory loss
347  // google::protobuf::ShutdownProtobufLibrary();
348 
349  LOG4CPLUS_INFO(server::GetLogger(), "Application ocm-server terminated.");
350  std::cout << "Exit main()\n";
351  return EXIT_SUCCESS;
352 }
ActionMgr class header file.
ActionsStd class header file.
void Register()
Definition: main.cpp:99
void DaqStatusUpdate(daq::ObservableStatus const &status)
Is notified of any DAQ status change and will post events to SM on DAQ activity flank changes.
Definition: main.cpp:117
DaqService(std::string name, daq::ManagerParams manager_params, std::filesystem::path const &workspace, std::string const &output_path, rad::IoExecutor &executor, std::shared_ptr< mal::Mal > mal, rad::cii::Replier &replier, rad::cii::Publisher< daqif::DaqStatus > &publisher, rad::SMAdapter &state_machine, daq::DpmClientParams const &dpm_params)
Definition: main.cpp:53
void Unregister()
Definition: main.cpp:104
Default factory producing "real" implementations.
Implements daq::Manager.
Definition: manager.hpp:254
void RestoreFromWorkspace() override
Loads status and constructs DaqControllers corresponding to stored state.
Definition: manager.cpp:127
std::vector< std::shared_ptr< DaqController const > > GetDaqControllers() override
Definition: manager.cpp:581
StatusSignal & GetStatusSignal() override
Definition: manager.cpp:577
Stores data acquisition status and allows subscription to status changes.
Definition: status.hpp:210
State GetState() const noexcept
Definition: status.cpp:270
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
Definition: status.cpp:357
std::string const & GetId() const noexcept
Definition: status.cpp:254
boost::signals2::connection ConnectObserver(Observer o)
Definition: manager.hpp:83
Implementation of daq::Workspace.
Definition: workspace.hpp:99
auto GetPath() const -> std::filesystem::path override
Definition: workspace.hpp:118
Adapter object intended to be used in contexts without direct access to the output-stream object.
Definition: report.hpp:54
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
Definition: ioExecutor.hpp:12
This class is responsible for the life-cycle management of actions and activities.
Definition: actionMgr.hpp:28
void CreateActivities(rad::SMAdapter &sm, DataContext &the_data)
Method to instantiate activity objects.
Definition: actionMgr.cpp:107
void CreateActions(boost::asio::io_service &ios, rad::SMAdapter &sm, DataContext &the_data)
Method to instantiate the action objects.
Definition: actionMgr.cpp:31
This class provide access to the command line options and the configuration parameters stored in the ...
Definition: config.hpp:60
const std::string & GetLogLevel() const
Definition: config.cpp:327
std::filesystem::path GetWorkspace() const
Definition: config.hpp:145
std::string m_out_path
Definition: config.hpp:171
const std::string & GetSmScxmlFilename() const
Definition: config.cpp:312
bool ParseOptions(int argc, char *argv[])
Disable assignment operator.
Definition: config.cpp:90
daq::DpmClientParams const & GetDpmClientParams() const
Definition: config.cpp:337
const std::string & GetMsgReplierEndpoint() const
Definition: config.cpp:289
void LoadConfig(const std::string &filename="")
This method load from a configuration file the application configuration overriding the initializatio...
Definition: config.cpp:166
std::chrono::seconds GetDbTimeout() const
Definition: config.cpp:308
std::chrono::hours m_stale_merging
Definition: config.hpp:175
std::chrono::hours m_stale_acquiring
Definition: config.hpp:174
std::string m_instrument_id
Definition: config.hpp:162
const std::string & GetLogProperties() const
Definition: config.cpp:332
const std::string & GetProcName() const
Definition: config.cpp:322
const std::string & GetPubEndpoint() const
Definition: config.cpp:294
This class provide access to the application run-time data including the in-memory DB.
Definition: dataContext.hpp:21
void UpdateDb()
Try to connect to the DB and update the application configuration.
Definition: dataContext.cpp:37
DbInterface & GetDbInterface()
Definition: dataContext.cpp:50
void SetControlState(const std::string &value)
Definition: dbInterface.cpp:35
daq::Workspace interface and implementation declaration
Contains support functions for daqif.
DataContext class header file.
DbInterface class header file.
daq::DpmClient
int main(int argc, char **argv)
Definition: main.cpp:68
Implements the MAL interface daqif::OcmDaq (async version).
Default logger name.
Declaration of daq::Manager
std::chrono::hours merging_stale_age
Age of DAQ in merging state, after which it is automatically considered abandoned and will be archive...
Definition: manager.hpp:53
std::string instrument_id
Instrument identifier.
Definition: manager.hpp:40
@ NotStarted
Initial state of data acquisition.
bool IsFinalState(State state) noexcept
Query whether state is in a final state.
Definition: state.cpp:15
std::chrono::hours acquiring_stale_age
Age of DAQ in acquiring state after which it is automatically considered abandoned and will be archiv...
Definition: manager.hpp:47
Connection parameters for DPM.
Definition: dpmClient.hpp:92
Configurations parameters directly related to manager.
Definition: manager.hpp:36
network::uri MakeServerUri(std::string uri)
Creates a server URI.
Definition: uri.cpp:13
network::uri MakeServiceUri(std::string base_uri, std::string_view service_path)
Creates a service URI of the form <baseuri>/<service>.
Definition: uri.cpp:19
std::optional< std::string > MakeStatusString(std::list< scxml4cpp::State * > &&states)
Make a status string from active states (as returned from scxml4cpp::Executor::getStatus()).
Definition: actionsStd.cpp:29
const std::string KEY_CONFIG_DATAROOT
Definition: config.hpp:28
const std::string LOGGER_NAME_MANAGER
Definition: logger.hpp:18
log4cplus::Logger & GetLogger()
Definition: logger.cpp:14
Declaration of OcmDaqService.
Contains declaration for for DaqController.
StdCmds Interface implementation header file.
Contains URI support functions for daqif.