19 #include <events.rad.hpp>
21 #include <ciiLogManager.hpp>
22 #include <rad/actionCallback.hpp>
23 #include <rad/exceptions.hpp>
24 #include <rad/logger.hpp>
25 #include <rad/mal/publisher.hpp>
26 #include <rad/mal/replier.hpp>
27 #include <rad/mal/utils.hpp>
28 #include <rad/smAdapter.hpp>
30 #include <scxml4cpp/Context.h>
31 #include <scxml4cpp/EventQueue.h>
33 #include <boost/asio.hpp>
34 #include <boost/exception/diagnostic_information.hpp>
35 #include <boost/filesystem.hpp>
36 #include <log4cplus/hierarchy.h>
38 #include <fmt/format.h>
55 std::filesystem::path
const& workspace,
56 std::string
const& output_path,
58 std::shared_ptr<mal::Mal>
mal,
59 rad::cii::Replier& replier,
60 rad::cii::Publisher<daqif::DaqStatus>& publisher,
61 rad::SMAdapter& state_machine,
63 : m_name(std::move(name))
64 , m_workspace(workspace)
65 , m_mal(std::move(
mal))
66 , m_event_log(std::make_shared<
daq::ObservableEventLog>())
68 std::make_shared<
daq::DpmClientImpl>(executor.get_io_context(), *m_mal, dpm_params))
69 , m_daq_factory(executor.get_io_context(), *m_mal, m_dpm_client)
71 std::move(manager_params),
78 executor.get_io_context(), *m_mal, m_manager, m_name, output_path, m_event_log))
80 , m_publisher(publisher)
81 , m_state_machine(state_machine)
82 , m_is_active(false) {
83 m_sample = m_publisher.CreateTopic();
93 "Failed to restore state from OCM workspace at "
94 << m_workspace.
GetPath().native() <<
":\n"
101 m_replier.RegisterService(m_name,
102 std::static_pointer_cast<daqif::AsyncOcmDaqControl>(m_service));
106 m_replier.UnregisterService(m_name);
120 fmt::format(
"Publishing DAQ status for DAQ {}", status.
GetId()));
126 m_state_machine.ProcessEvent(Events::AnyDaqActive{});
132 std::all_of(daqs.begin(), daqs.end(), [](
auto const shrd_daq) ->
bool {
134 return daq::IsFinalState(shrd_daq->GetState());
140 m_state_machine.ProcessEvent(Events::AllDaqInactive{});
146 m_publisher.Publish(*m_sample);
148 fmt::format(
"Publishing DAQ status for DAQ {} done", status.
GetId()));
149 }
catch (std::exception
const& e) {
151 fmt::format(
"Failed to publish status: {}", e.what()));
158 std::shared_ptr<mal::Mal> m_mal;
159 std::shared_ptr<daq::ObservableEventLog> m_event_log;
160 std::shared_ptr<daq::DpmClientImpl> m_dpm_client;
163 std::shared_ptr<OcmDaqService> m_service;
164 rad::cii::Replier& m_replier;
165 rad::cii::Publisher<daqif::DaqStatus>& m_publisher;
166 rad::SMAdapter& m_state_machine;
169 boost::signals2::connection m_connection;
170 std::shared_ptr<daqif::DaqStatus> m_sample;
179 int main(
int argc,
char* argv[]) {
180 namespace fs = boost::filesystem;
182 auto log_initializer = rad::LogInitializer();
195 elt::mal::CiiFactory& factory = elt::mal::CiiFactory::getInstance();
196 auto mal = elt::mal::loadMal(
"zpb", {});
197 factory.registerMal(
"zpb",
mal);
201 elt::log::CiiLogManager::Configure(
205 if (resolved.empty()) {
207 "Configured log property file not found: '"
208 << file <<
"', $CFGPATH=" << rad::Helper::GetEnvVar(
"CFGPATH"));
211 elt::log::CiiLogManager::Configure(resolved);
216 "Output path not configured. Define environment "
217 "variable $DATAROOT or configuration " +
220 std::cerr << msg << std::endl;
226 fmt::format(
"Output dir '{}' is not a directory. Creating it now",
229 fs::permissions(config.
m_out_path, fs::others_read | fs::owner_all | fs::group_all);
234 fmt::format(
"Using workspace directory: {}", workspace_root.native()));
242 auto std_status_publisher = std::make_unique<rad::cii::Publisher<stdif::Status>>(
244 auto daq_status_publisher = std::make_unique<rad::cii::Publisher<daqif::DaqStatus>>(
248 rad::OldbAsyncWriter oldb_writer(config.
GetDbTimeout(), std::chrono::milliseconds(100));
259 boost::asio::io_context io_ctx;
266 scxml4cpp::EventQueue external_events;
267 scxml4cpp::Context state_machine_ctx;
270 rad::SMAdapter state_machine(io_ctx, &state_machine_ctx, external_events);
285 *daq_status_publisher,
293 action_mgr.AddAction(
new rad::ActionCallback(
"OcmDaq.RegisterService",
294 [&](
auto) { daq_service.
Register(); }));
295 action_mgr.AddAction(
new rad::ActionCallback(
"OcmDaq.UnregisterService",
303 state_machine.RegisterDefaultRequestRejectHandler<Events::Init>();
304 state_machine.RegisterDefaultRequestRejectHandler<Events::Enable>();
305 state_machine.RegisterDefaultRequestRejectHandler<Events::Disable>();
308 state_machine.SetStatusPublisher([&](std::string
const& status) {
310 .value_or(
"NotOperational;Undefined");
312 auto sample = std_status_publisher->CreateTopic();
313 sample->setStatus(ics_status);
315 std_status_publisher->Publish(*sample);
321 mal_replier.RegisterService<stdif::AsyncStdCmds>(
322 "std", std::make_shared<server::StdCmdsImpl>(state_machine));
327 state_machine.Start();
328 oldb_writer.StartWriter();
331 }
catch (std::exception
const& e) {
332 LOG4CPLUS_ERROR(
server::GetLogger(),
"Exception from asio context: " << e.what());
334 state_machine.Stop();
335 }
catch (rad::Exception& e) {
338 std::cout <<
"Exit main()\n";
341 LOG4CPLUS_ERROR(
server::GetLogger(), boost::current_exception_diagnostic_information());
342 std::cout <<
"Exit main()\n";
350 std::cout <<
"Exit main()\n";
ActionMgr class header file.
ActionsStd class header file.
void DaqStatusUpdate(daq::ObservableStatus const &status)
Is notified of any DAQ status change and will post events to SM on DAQ activity flank changes.
DaqService(std::string name, daq::ManagerParams manager_params, std::filesystem::path const &workspace, std::string const &output_path, rad::IoExecutor &executor, std::shared_ptr< mal::Mal > mal, rad::cii::Replier &replier, rad::cii::Publisher< daqif::DaqStatus > &publisher, rad::SMAdapter &state_machine, daq::DpmClientParams const &dpm_params)
Default factory producing "real" implementations.
void RestoreFromWorkspace() override
Loads status and constructs DaqControllers corresponding to stored state.
std::vector< std::shared_ptr< DaqController const > > GetDaqControllers() override
StatusSignal & GetStatusSignal() override
Stores data acquisition status and allows subscription to status changes.
State GetState() const noexcept
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
std::string const & GetId() const noexcept
boost::signals2::connection ConnectObserver(Observer o)
Implementation of daq::Workspace.
auto GetPath() const -> std::filesystem::path override
Adapter object intended to be used in contexts without direct access to the output-stream object.
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
This class is responsible for the life-cycle management of actions and activities.
void CreateActivities(rad::SMAdapter &sm, DataContext &the_data)
Method to instantiate activity objects.
void CreateActions(boost::asio::io_service &ios, rad::SMAdapter &sm, DataContext &the_data)
Method to instantiate the action objects.
This class provide access to the command line options and the configuration parameters stored in the ...
const std::string & GetLogLevel() const
std::filesystem::path GetWorkspace() const
const std::string & GetSmScxmlFilename() const
bool ParseOptions(int argc, char *argv[])
Disable assignment operator.
daq::DpmClientParams const & GetDpmClientParams() const
const std::string & GetMsgReplierEndpoint() const
void LoadConfig(const std::string &filename="")
This method load from a configuration file the application configuration overriding the initializatio...
std::chrono::seconds GetDbTimeout() const
std::chrono::hours m_stale_merging
std::chrono::hours m_stale_acquiring
std::string m_instrument_id
const std::string & GetLogProperties() const
const std::string & GetProcName() const
const std::string & GetPubEndpoint() const
This class provide access to the application run-time data including the in-memory DB.
void UpdateDb()
Try to connect to the DB and update the application configuration.
DbInterface & GetDbInterface()
void SetControlState(const std::string &value)
daq::Workspace interface and implementation declaration
Contains support functions for daqif.
DataContext class header file.
DbInterface class header file.
int main(int argc, char **argv)
Implements the MAL interface daqif::OcmDaq (async version).
Declaration of daq::Manager
std::chrono::hours merging_stale_age
Age of DAQ in merging state, after which it is automatically considered abandoned and will be archive...
std::string instrument_id
Instrument identifier.
@ NotStarted
Initial state of data acquisition.
bool IsFinalState(State state) noexcept
Query whether state is in a final state.
std::chrono::hours acquiring_stale_age
Age of DAQ in acquiring state after which it is automatically considered abandoned and will be archiv...
Connection parameters for DPM.
Configurations parameters directly related to manager.
network::uri MakeServerUri(std::string uri)
Creates a server URI.
network::uri MakeServiceUri(std::string base_uri, std::string_view service_path)
Creates a service URI of the form <baseuri>/<service>.
std::optional< std::string > MakeStatusString(std::list< scxml4cpp::State * > &&states)
Make a status string from active states (as returned from scxml4cpp::Executor::getStatus()).
const std::string KEY_CONFIG_DATAROOT
const std::string LOGGER_NAME_MANAGER
log4cplus::Logger & GetLogger()
Declaration of OcmDaqService.
Contains declaration for for DaqController.
StdCmds Interface implementation header file.
Contains URI support functions for daqif.