14 #include <log4cplus/loggingmacros.h>
15 #include <fmt/format.h>
16 #include <fmt/ostream.h>
17 #include <mal/Mal.hpp>
34 struct timeval jitter_tv {0, jitter * 1000};
35 if (gettimeofday(&tv,
nullptr) != 0) {
37 throw std::system_error(errno, std::generic_category());
42 timeradd(&tv, &jitter_tv, &res);
46 time_t time = tv.tv_sec;
47 if (gmtime_r(&time, &tm_time) ==
nullptr) {
49 throw std::system_error(errno, std::generic_category());
52 char time_str[31] = {0};
53 int n = snprintf(&time_str[0], 7,
"%.5s.", instrument_id);
55 strftime(&time_str[n], 20,
"%Y-%m-%dT%H:%M:%S", &tm_time);
57 snprintf(&frac_str[0], 5,
".%.3d",
static_cast<int>(tv.tv_usec / 1000.0));
59 strncpy(&time_str[n + 19], &frac_str[0], 4);
60 return std::string(time_str, n + 23);
64 : m_alive_token(std::make_shared<bool>())
65 , m_executor(executor)
66 , m_instrument_id(std::move(instrument_id))
67 , m_logger(log4cplus::Logger::getInstance(
"daq.manager")) {
73 for (
auto& op : m_abort_funcs) {
76 }
catch (std::exception
const& e) {
77 LOG4CPLUS_WARN(m_logger,
78 fmt::format(
"ManagerImpl::~ManagerImpl: Error when aborting "
88 for (
unsigned jitter=0; ; ++jitter) {
102 throw std::invalid_argument(
"DaqController with same id already exists");
104 m_daq_controllers.emplace_back(
daq->GetId(),
106 daq->GetStatus()->ConnectObserver(
108 this->m_status_signal.Signal(status);
111 m_status_signal.
Signal(*
daq->GetStatus());
115 auto&
daq = FindDaqOrThrow(
id);
116 return daq.GetStatus()->GetStatus();
119 ManagerImpl::Daq::Daq(std::string id_arg,
120 std::shared_ptr<DaqController> controller_arg,
121 boost::signals2::connection connection_arg) noexcept
122 : id(std::move(id_arg))
123 , controller(std::move(controller_arg))
124 , connection(std::move(connection_arg)) {
127 DaqController
const* ManagerImpl::FindDaq(std::string_view
id)
const noexcept {
128 return const_cast<ManagerImpl*
>(
this)->FindDaq(
id);
131 DaqController* ManagerImpl::FindDaq(std::string_view
id) noexcept {
132 auto it = std::find_if(m_daq_controllers.begin(),
133 m_daq_controllers.end(),
134 [
id](
auto const&
daq) { return daq.id == id; });
135 if (it != m_daq_controllers.end()) {
136 return it->controller.get();
141 DaqController& ManagerImpl::FindDaqOrThrow(std::string_view
id) {
142 auto daq_ptr = FindDaq(
id);
145 throw boost::enable_current_exception(std::invalid_argument(
146 fmt::format(
"DaqController with id '{}' does not exist", std::string(
id))));
151 DaqController
const& ManagerImpl::FindDaqOrThrow(std::string_view
id)
const {
152 return const_cast<ManagerImpl*
>(
this)->FindDaqOrThrow(
id);
159 return boost::make_exceptional_future<State>();
165 return FindDaqOrThrow(
id).
StopAsync(policy);
167 return boost::make_exceptional_future<Status>();
175 return boost::make_exceptional_future<Status>();
181 std::chrono::milliseconds timeout) {
183 auto&
daq = FindDaqOrThrow(
id);
184 auto status =
daq.GetStatus();
186 LOG4CPLUS_INFO(m_logger,
187 fmt::format(
"{}: Await condition already fulfilled.", *status));
189 return boost::make_ready_future<Result<Status>>({
false, *status});
191 auto [fut, abort] = op::InitiateAbortableOperation<op::AwaitStateAsync>(
194 auto& ref = m_abort_funcs.emplace_back(std::move(abort));
195 LOG4CPLUS_DEBUG(
"daq.manager",
196 fmt::format(
"op::AwaitStateAsync initiated. id={}",
200 [
this,
id = ref.GetId(), alive = std::weak_ptr<bool>(m_alive_token)](
auto res) {
201 LOG4CPLUS_DEBUG(
"daq.manager",
202 fmt::format(
"op::AwaitStateAsync completed. id={}",
206 auto is_alive = !alive.expired();
215 return boost::make_exceptional_future<Result<Status>>();
220 return FindDaqOrThrow(
id).UpdateKeywords(keywords);
224 return m_status_signal;
227 std::vector<std::shared_ptr<DaqController const>> ManagerImpl::GetDaqControllers() {
228 std::vector<std::shared_ptr<DaqController const>> controllers;
229 controllers.reserve(m_daq_controllers.size());
230 std::transform(m_daq_controllers.begin(),
231 m_daq_controllers.end(),
232 std::back_inserter(controllers),
233 [](
auto const&
daq) { return daq.controller; });
237 void ManagerImpl::RemoveAbortFunc(uint64_t
id) noexcept {
239 m_abort_funcs.erase(std::remove_if(m_abort_funcs.begin(),
241 [
id](
auto const& obj) { return id == obj.GetId(); }),
242 m_abort_funcs.end());
246 ManagerImpl::OpAbortFunc::OpAbortFunc(OpAbortFunc::Func&& func)
248 , m_func(std::move(func)) {
252 uint64_t ManagerImpl::OpAbortFunc::GetId() const noexcept {
257 bool ManagerImpl::OpAbortFunc::Abort() noexcept {
261 uint64_t ManagerImpl::OpAbortFunc::NextId() {
262 static uint64_t next_id = 0;