20#include <events.rad.hpp>
22#include <ciiLogManager.hpp>
23#include <rad/actionCallback.hpp>
24#include <rad/exceptions.hpp>
25#include <rad/logger.hpp>
26#include <rad/mal/publisher.hpp>
27#include <rad/mal/replier.hpp>
28#include <rad/mal/utils.hpp>
29#include <rad/smAdapter.hpp>
31#include <scxml4cpp/Context.h>
32#include <scxml4cpp/EventQueue.h>
34#include <boost/asio.hpp>
35#include <boost/exception/diagnostic_information.hpp>
36#include <boost/filesystem.hpp>
37#include <log4cplus/hierarchy.h>
39#include <fmt/format.h>
57 std::filesystem::path
const& workspace,
59 std::string
const& output_path,
61 std::shared_ptr<mal::Mal>
mal,
62 rad::cii::Replier& replier,
63 rad::cii::Publisher<daqif::DaqStatus>& publisher,
64 rad::SMAdapter& state_machine,
66 : m_name(std::move(name))
67 , m_workspace(workspace)
68 , m_mal(std::move(
mal))
69 , m_event_log(std::make_shared<
daq::ObservableEventLog>())
71 std::make_shared<
daq::DpmClientImpl>(executor.get_io_context(), *m_mal, dpm_params))
72 , m_daq_factory(executor.get_io_context(), *m_mal, m_dpm_client)
74 std::move(manager_params),
80 log4cplus::Logger::getInstance(
server::LOGGER_NAME_MANAGER))
82 executor.get_io_context(), *m_mal, m_manager, m_name, output_path, m_event_log))
84 , m_publisher(publisher)
85 , m_state_machine(state_machine)
86 , m_is_active(false) {
87 m_sample = m_publisher.CreateTopic();
97 "Failed to restore state from OCM workspace at "
98 << m_workspace.
GetPath().native() <<
":\n"
105 m_replier.RegisterService(m_name,
106 std::static_pointer_cast<daqif::AsyncOcmDaqControl>(m_service));
110 m_replier.UnregisterService(m_name);
124 fmt::format(
"Publishing DAQ status for DAQ {}", status.
GetId()));
130 m_state_machine.ProcessEvent(Events::AnyDaqActive{});
136 std::all_of(daqs.begin(), daqs.end(), [](
auto const shrd_daq) ->
bool {
138 return daq::IsFinalState(shrd_daq->GetState());
144 m_state_machine.ProcessEvent(Events::AllDaqInactive{});
150 m_publisher.Publish(*m_sample);
152 fmt::format(
"Publishing DAQ status for DAQ {} done", status.
GetId()));
153 }
catch (std::exception
const& e) {
155 fmt::format(
"Failed to publish status: {}", e.what()));
162 std::shared_ptr<mal::Mal> m_mal;
163 std::shared_ptr<daq::ObservableEventLog> m_event_log;
164 std::shared_ptr<daq::DpmClientImpl> m_dpm_client;
167 std::shared_ptr<OcmDaqService> m_service;
168 rad::cii::Replier& m_replier;
169 rad::cii::Publisher<daqif::DaqStatus>& m_publisher;
170 rad::SMAdapter& m_state_machine;
173 boost::signals2::connection m_connection;
174 std::shared_ptr<daqif::DaqStatus> m_sample;
183int main(
int argc,
char* argv[]) {
184 namespace fs = boost::filesystem;
186 auto log_initializer = rad::LogInitializer();
199 elt::mal::CiiFactory& factory = elt::mal::CiiFactory::getInstance();
200 auto mal = elt::mal::loadMal(
"zpb", {});
201 factory.registerMal(
"zpb",
mal);
205 elt::log::CiiLogManager::SetApplicationName(config.
GetProcName());
206 auto* daq_logs = std::getenv(
"DAQ_LOGS");
207 elt::log::CiiLogManager::Configure(rad::GetDefaultLogProperties(
208 daq_logs ==
nullptr ? std::string()
209 : fmt::format(
"{}/{}.log", daq_logs, config.
GetProcName()),
213 if (resolved.empty()) {
215 "Configured log property file not found: '"
216 << file <<
"', $CFGPATH=" << rad::Helper::GetEnvVar(
"CFGPATH"));
219 elt::log::CiiLogManager::Configure(resolved);
224 "Output path not configured. Define environment "
225 "variable $DATAROOT or configuration " +
228 std::cerr << msg <<
'\n';
234 fmt::format(
"Output dir '{}' is not a directory. Creating it now",
237 fs::permissions(config.
m_out_path, fs::others_read | fs::owner_all | fs::group_all);
242 fmt::format(
"Using workspace directory: {}", workspace_root.native()));
250 auto std_status_publisher = std::make_unique<rad::cii::Publisher<stdif::Status>>(
252 auto daq_status_publisher = std::make_unique<rad::cii::Publisher<daqif::DaqStatus>>(
256 rad::OldbAsyncWriter oldb_writer(config.
GetDbTimeout(), std::chrono::milliseconds(100));
267 boost::asio::io_context io_ctx;
274 scxml4cpp::EventQueue external_events;
275 scxml4cpp::Context state_machine_ctx;
278 rad::SMAdapter state_machine(io_ctx, &state_machine_ctx, external_events);
285 std::unique_ptr<daq::fits::KeywordFormatter> kw_formatter;
287 auto ptr = std::make_unique<daq::DictKeywordFormatter>();
290 "Loaded the following dictionaries: " << ptr->GetDictionariesNames());
291 kw_formatter = std::move(ptr);
293 kw_formatter = std::make_unique<daq::fits::StandardKeywordFormatter>();
304 *daq_status_publisher,
312 action_mgr.AddAction(
new rad::ActionCallback(
"OcmDaq.RegisterService",
313 [&](
auto) { daq_service.
Register(); }));
314 action_mgr.AddAction(
new rad::ActionCallback(
"OcmDaq.UnregisterService",
322 state_machine.RegisterDefaultRequestRejectHandler<Events::Init>();
323 state_machine.RegisterDefaultRequestRejectHandler<Events::Enable>();
324 state_machine.RegisterDefaultRequestRejectHandler<Events::Disable>();
327 state_machine.SetStatusPublisher([&](std::string
const& status) {
329 .value_or(
"NotOperational;Undefined");
331 auto sample = std_status_publisher->CreateTopic();
332 sample->setStatus(ics_status);
334 std_status_publisher->Publish(*sample);
340 mal_replier.RegisterService<stdif::AsyncStdCmds>(
341 "std", std::make_shared<server::StdCmdsImpl>(state_machine));
346 state_machine.Start();
347 oldb_writer.StartWriter();
350 }
catch (std::exception
const& e) {
355 state_machine.Stop();
356 }
catch (std::exception
const& e) {
358 "Exception in application main: \n"
361 std::cout <<
"Exit main()\n";
365 "Unknown exception in application main: "
366 << boost::current_exception_diagnostic_information());
367 std::cout <<
"Exit main()\n";
375 std::cout <<
"Exit main()\n";
ActionMgr class header file.
ActionsStd class header file.
DaqService(std::string name, daq::ManagerParams manager_params, std::filesystem::path const &workspace, daq::fits::KeywordFormatter const &kw_formatter, std::string const &output_path, rad::IoExecutor &executor, std::shared_ptr< mal::Mal > mal, rad::cii::Replier &replier, rad::cii::Publisher< daqif::DaqStatus > &publisher, rad::SMAdapter &state_machine, daq::DpmClientParams const &dpm_params)
void DaqStatusUpdate(daq::ObservableStatus const &status)
Is notified of any DAQ status change and will post events to SM on DAQ activity flank changes.
Default factory producing "real" implementations.
void RestoreFromWorkspace() override
Loads status and constructs DaqControllers corresponding to stored state.
StatusSignal & GetStatusSignal() override
std::vector< std::shared_ptr< DaqController const > > GetDaqControllers() override
Stores data acquisition status and allows subscription to status changes.
State GetState() const noexcept
Status const & GetStatus() const noexcept
Connect observer that is invoked when state is modified.
std::string const & GetId() const noexcept
boost::signals2::connection ConnectObserver(Observer o)
Implementation of daq::Workspace.
auto GetPath() const -> std::filesystem::path override
Adapter object intended to be used in contexts without direct access to the output-stream object.
Adapts boost::asio::io_context into a compatible boost::thread Executor type.
This class is responsible for the life-cycle management of actions and activities.
void CreateActivities(rad::SMAdapter &sm, DataContext &the_data)
Method to instantiate activity objects.
void CreateActions(boost::asio::io_service &ios, rad::SMAdapter &sm, DataContext &the_data)
Method to instantiate the action objects.
This class provide access to the command line options and the configuration parameters stored in the ...
const std::string & GetLogLevel() const
std::filesystem::path GetWorkspace() const
const std::string & GetSmScxmlFilename() const
bool ParseOptions(int argc, char *argv[])
Disable assignment operator.
daq::DpmClientParams const & GetDpmClientParams() const
const std::string & GetMsgReplierEndpoint() const
void LoadConfig(const std::string &filename="")
This method load from a configuration file the application configuration overriding the initializatio...
std::chrono::seconds GetDbTimeout() const
std::chrono::hours m_stale_merging
std::chrono::hours m_stale_acquiring
std::string m_instrument_id
const std::string & GetLogProperties() const
const std::string & GetProcName() const
std::vector< std::string > const & GetDictionaryNames() const
const std::string & GetPubEndpoint() const
This class provide access to the application run-time data including the in-memory DB.
void UpdateDb()
Try to connect to the DB and update the application configuration.
DbInterface & GetDbInterface()
void SetControlState(const std::string &value)
daq::Workspace interface and implementation declaration
Contains support functions for daqif.
DataContext class header file.
DbInterface class header file.
int main(int argc, char **argv)
Implements the MAL interface daqif::OcmDaq (async version).
Contains data structure for FITS keywords.
Declaration of daq::Manager
std::chrono::hours merging_stale_age
Age of DAQ in merging state, after which it is automatically considered abandoned and will be archive...
std::string instrument_id
Instrument identifier.
@ NotStarted
Initial state of data acquisition.
bool IsFinalState(State state) noexcept
Query whether state is in a final state.
std::chrono::hours acquiring_stale_age
Age of DAQ in acquiring state after which it is automatically considered abandoned and will be archiv...
Connection parameters for DPM.
Configurations parameters directly related to manager.
network::uri MakeServerUri(std::string uri)
Creates a server URI.
network::uri MakeServiceUri(std::string base_uri, std::string_view service_path)
Creates a service URI of the form <baseuri>/<service>.
std::optional< std::string > MakeStatusString(std::list< scxml4cpp::State * > &&states)
Make a status string from active states (as returned from scxml4cpp::Executor::getStatus()).
const std::string KEY_CONFIG_DATAROOT
log4cplus::Logger & GetLogger()
Declaration of OcmDaqService.
Contains declaration for for DaqController.
StdCmds Interface implementation header file.
Contains URI support functions for daqif.