19 #include <events.rad.hpp>
21 #include <ciiLogManager.hpp>
22 #include <rad/actionCallback.hpp>
23 #include <rad/exceptions.hpp>
24 #include <rad/logger.hpp>
25 #include <rad/mal/publisher.hpp>
26 #include <rad/mal/replier.hpp>
27 #include <rad/mal/utils.hpp>
28 #include <rad/smAdapter.hpp>
30 #include <scxml4cpp/Context.h>
31 #include <scxml4cpp/EventQueue.h>
33 #include <boost/asio.hpp>
34 #include <boost/exception/diagnostic_information.hpp>
35 #include <boost/filesystem.hpp>
36 #include <log4cplus/hierarchy.h>
38 #include <fmt/format.h>
55 std::filesystem::path
const& workspace,
56 std::string
const& output_path,
58 std::shared_ptr<mal::Mal>
mal,
59 rad::cii::Replier& replier,
60 rad::cii::Publisher<daqif::DaqStatus>& publisher,
61 rad::SMAdapter& state_machine,
63 : m_name(std::move(name))
64 , m_workspace(workspace)
65 , m_mal(std::move(
mal))
66 , m_event_log(std::make_shared<
daq::ObservableEventLog>())
68 std::make_shared<
daq::DpmClientImpl>(executor.get_io_context(), *m_mal, dpm_params))
69 , m_daq_factory(executor.get_io_context(), *m_mal, m_dpm_client)
71 std::move(manager_params),
77 executor.get_io_context(), *m_mal, m_manager, m_name, output_path, m_event_log))
79 , m_publisher(publisher)
80 , m_state_machine(state_machine)
81 , m_is_active(false) {
82 m_sample = m_publisher.CreateTopic();
92 "Failed to restore state from OCM workspace at "
93 << m_workspace.
GetPath().native() <<
":\n"
100 m_replier.RegisterService(m_name,
101 std::static_pointer_cast<daqif::AsyncOcmDaqControl>(m_service));
105 m_replier.UnregisterService(m_name);
119 fmt::format(
"Publishing DAQ status for DAQ {}", status.
GetId()));
125 m_state_machine.ProcessEvent(Events::AnyDaqActive{});
131 std::all_of(daqs.begin(), daqs.end(), [](
auto const shrd_daq) ->
bool {
133 return daq::IsFinalState(shrd_daq->GetState());
139 m_state_machine.ProcessEvent(Events::AllDaqInactive{});
145 m_publisher.Publish(*m_sample);
147 fmt::format(
"Publishing DAQ status for DAQ {} done", status.
GetId()));
148 }
catch (std::exception
const& e) {
150 fmt::format(
"Failed to publish status: {}", e.what()));
157 std::shared_ptr<mal::Mal> m_mal;
158 std::shared_ptr<daq::ObservableEventLog> m_event_log;
159 std::shared_ptr<daq::DpmClientImpl> m_dpm_client;
162 std::shared_ptr<OcmDaqService> m_service;
163 rad::cii::Replier& m_replier;
164 rad::cii::Publisher<daqif::DaqStatus>& m_publisher;
165 rad::SMAdapter& m_state_machine;
168 boost::signals2::connection m_connection;
169 std::shared_ptr<daqif::DaqStatus> m_sample;
178 int main(
int argc,
char* argv[]) {
179 namespace fs = boost::filesystem;
181 auto log_initializer = rad::LogInitializer();
194 elt::mal::CiiFactory& factory = elt::mal::CiiFactory::getInstance();
195 auto mal = elt::mal::loadMal(
"zpb", {});
196 factory.registerMal(
"zpb",
mal);
200 elt::log::CiiLogManager::Configure(
204 if (resolved.empty()) {
206 "Configured log property file not found: '"
207 << file <<
"', $CFGPATH=" << rad::Helper::GetEnvVar(
"CFGPATH"));
210 elt::log::CiiLogManager::Configure(resolved);
215 "Output path not configured. Define environment "
216 "variable $DATAROOT or configuration " +
219 std::cerr << msg << std::endl;
225 fmt::format(
"Output dir '{}' is not a directory. Creating it now",
228 fs::permissions(config.
m_out_path, fs::others_read | fs::owner_all | fs::group_all);
233 fmt::format(
"Using workspace directory: {}", workspace_root.native()));
241 auto std_status_publisher = std::make_unique<rad::cii::Publisher<stdif::Status>>(
243 auto daq_status_publisher = std::make_unique<rad::cii::Publisher<daqif::DaqStatus>>(
247 rad::OldbAsyncWriter oldb_writer(config.
GetDbTimeout(), std::chrono::milliseconds(100));
258 boost::asio::io_context io_ctx;
265 scxml4cpp::EventQueue external_events;
266 scxml4cpp::Context state_machine_ctx;
269 rad::SMAdapter state_machine(io_ctx, &state_machine_ctx, external_events);
284 *daq_status_publisher,
292 action_mgr.AddAction(
new rad::ActionCallback(
"OcmDaq.RegisterService",
293 [&](
auto) { daq_service.
Register(); }));
294 action_mgr.AddAction(
new rad::ActionCallback(
"OcmDaq.UnregisterService",
302 state_machine.RegisterDefaultRequestRejectHandler<Events::Init>();
303 state_machine.RegisterDefaultRequestRejectHandler<Events::Enable>();
304 state_machine.RegisterDefaultRequestRejectHandler<Events::Disable>();
307 state_machine.SetStatusPublisher([&](std::string
const& status) {
309 .value_or(
"NotOperational;Undefined");
311 auto sample = std_status_publisher->CreateTopic();
312 sample->setStatus(ics_status);
314 std_status_publisher->Publish(*sample);
320 mal_replier.RegisterService<stdif::AsyncStdCmds>(
321 "std", std::make_shared<server::StdCmdsImpl>(state_machine));
326 state_machine.Start();
327 oldb_writer.StartWriter();
329 state_machine.Stop();
330 }
catch (rad::Exception& e) {
333 std::cout <<
"Exit main()\n";
336 LOG4CPLUS_ERROR(
server::GetLogger(), boost::current_exception_diagnostic_information());
337 std::cout <<
"Exit main()\n";
345 std::cout <<
"Exit main()\n";
ActionMgr class header file.
ActionsStd class header file.
void DaqStatusUpdate(daq::ObservableStatus const &status)
Is notified of any DAQ status change and will post events to SM on DAQ activity flank changes.
DaqService(std::string name, daq::ManagerParams manager_params, std::filesystem::path const &workspace, std::string const &output_path, rad::IoExecutor &executor, std::shared_ptr< mal::Mal > mal, rad::cii::Replier &replier, rad::cii::Publisher< daqif::DaqStatus > &publisher, rad::SMAdapter &state_machine, daq::DpmClientParams const &dpm_params)
Default factory producing "real" implementations.
void RestoreFromWorkspace() override
Loads status and constructs DaqControllers corresponding to stored state.
std::vector< std::shared_ptr< DaqController const > > GetDaqControllers() override
StatusSignal & GetStatusSignal() override
Stores data acquisition status and allows subscription to status changes.
State GetState() const noexcept
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
std::string const & GetId() const noexcept
boost::signals2::connection ConnectObserver(Observer o)
Implementation of daq::Workspace.
auto GetPath() const -> std::filesystem::path override
Adapter object intended to be used in contexts without direct access to the output-stream object.
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
This class is responsible for the life-cycle management of actions and activities.
void CreateActivities(rad::SMAdapter &sm, DataContext &the_data)
Method to instantiate activity objects.
void CreateActions(boost::asio::io_service &ios, rad::SMAdapter &sm, DataContext &the_data)
Method to instantiate the action objects.
This class provide access to the command line options and the configuration parameters stored in the ...
const std::string & GetLogLevel() const
std::filesystem::path GetWorkspace() const
const std::string & GetSmScxmlFilename() const
bool ParseOptions(int argc, char *argv[])
Disable assignment operator.
daq::DpmClientParams const & GetDpmClientParams() const
const std::string & GetMsgReplierEndpoint() const
void LoadConfig(const std::string &filename="")
This method load from a configuration file the application configuration overriding the initializatio...
std::chrono::seconds GetDbTimeout() const
std::chrono::hours m_stale_merging
std::chrono::hours m_stale_acquiring
std::string m_instrument_id
const std::string & GetLogProperties() const
const std::string & GetProcName() const
const std::string & GetPubEndpoint() const
This class provide access to the application run-time data including the in-memory DB.
void UpdateDb()
Try to connect to the DB and update the application configuration.
DbInterface & GetDbInterface()
void SetControlState(const std::string &value)
daq::Workspace interface and implementation declaration
Contains support functions for daqif.
DataContext class header file.
DbInterface class header file.
int main(int argc, char **argv)
Implements the MAL interface daqif::OcmDaq (async version).
Declaration of daq::Manager
std::chrono::hours merging_stale_age
Age of DAQ in merging state, after which it is automatically considered abandoned and will be archive...
std::string instrument_id
Instrument identifier.
@ NotStarted
Initial state of data acquisition.
bool IsFinalState(State state) noexcept
Query whether state is in a final state.
std::chrono::hours acquiring_stale_age
Age of DAQ in acquiring state after which it is automatically considered abandoned and will be archiv...
Connection parameters for DPM.
Configurations parameters directly related to manager.
network::uri MakeServerUri(std::string uri)
Creates a server URI.
network::uri MakeServiceUri(std::string base_uri, std::string_view service_path)
Creates a service URI of the form <baseuri>/<service>.
std::optional< std::string > MakeStatusString(std::list< scxml4cpp::State * > &&states)
Make a status string from active states (as returned from scxml4cpp::Executor::getStatus()).
const std::string KEY_CONFIG_DATAROOT
const std::string LOGGER_NAME_MANAGER
log4cplus::Logger & GetLogger()
Declaration of OcmDaqService.
Contains declaration for for DaqController.
StdCmds Interface implementation header file.
Contains URI support functions for daqif.