43using ObsCallback = std::function<void (
const T&)>;
44using ObsCallbackWithTimestamp = std::function<void (
const T&,
const taiclock::TaiClock::time_point&)>;
45using ObsCallbackWithDpQuality = std::function<void (
const T&,
const elt::oldb::CiiOldbDpQuality&)>;
46using ObsCallbackWithTimestampAndDpQuality =
47 std::function<void (
const T&,
const taiclock::TaiClock::time_point&,
const elt::oldb::CiiOldbDpQuality&)>;
48using ObsVariant = std::variant<ObsCallback, ObsCallbackWithTimestamp, ObsCallbackWithDpQuality, ObsCallbackWithTimestampAndDpQuality>;
58 explicit ObservablePublisher(std::unique_ptr<::elt::mal::ps::Publisher<T>> malpub,
bool use_malpub =
true);
64 void AddObserver(ObsCallbackWithTimestamp observer_function);
67 void AddObserver(ObsCallbackWithDpQuality observer_function);
70 void AddObserver(ObsCallbackWithTimestampAndDpQuality observer_function);
75 const std::chrono::milliseconds& timeout,
76 const taiclock::TaiClock::time_point& timestamp = taiclock::TaiClock::now(),
77 const elt::oldb::CiiOldbDpQuality& dp_quality = elt::oldb::CiiOldbDpQuality::OK)
const;
80 const log4cplus::Logger logger = elt::log::CiiLogManager::GetLogger();
81 const std::unique_ptr<::elt::mal::ps::Publisher<T>> m_malpub;
82 const bool m_use_malpub;
85 std::vector<ObsVariant> m_observers;
120 const std::chrono::milliseconds& timeout,
121 const taiclock::TaiClock::time_point& timestamp,
122 const elt::oldb::CiiOldbDpQuality& dp_quality)
const {
123 auto t_0 = std::chrono::steady_clock::now();
124 if (m_malpub !=
nullptr && m_use_malpub) {
125 logger.log(log4cplus::DEBUG_LOG_LEVEL,
"publishing over MAL");
127 m_malpub->publish(
data, timeout);
128 }
catch (std::exception& exc) {
129 std::string cause = exc.what();
130 logger.log(log4cplus::ERROR_LOG_LEVEL,
"Ignoring: publishing to MAL failed, due to: " + cause);
134 auto t_1 = std::chrono::steady_clock::now();
135 logger.log(log4cplus::DEBUG_LOG_LEVEL,
"notifying observers");
136 for (ObsVariant obs_func : m_observers) {
139 if (std::holds_alternative<ObsCallback>(obs_func)) {
140 auto func = std::get<ObsCallback>(obs_func);
144 else if (std::holds_alternative<ObsCallbackWithTimestamp>(obs_func)) {
145 auto func = std::get<ObsCallbackWithTimestamp>(obs_func);
146 func(
data, timestamp);
149 else if (std::holds_alternative<ObsCallbackWithDpQuality>(obs_func)) {
150 auto func = std::get<ObsCallbackWithDpQuality>(obs_func);
151 func(
data, dp_quality);
154 else if (std::holds_alternative<ObsCallbackWithTimestampAndDpQuality>(obs_func)) {
155 auto func = std::get<ObsCallbackWithTimestampAndDpQuality>(obs_func);
156 func(
data, timestamp, dp_quality);
160 logger.log(log4cplus::ERROR_LOG_LEVEL,
"Observable function variant holds NO valid"
163 }
catch (std::exception& exc) {
164 std::string cause = exc.what();
165 logger.log(log4cplus::DEBUG_LOG_LEVEL,
"Ignoring: callback to observer failed, due to: " + cause);
168 logger.log(log4cplus::DEBUG_LOG_LEVEL,
"publish done");
PublishingTimeElapsed Publish(const T &data, const std::chrono::milliseconds &timeout, const taiclock::TaiClock::time_point ×tamp=taiclock::TaiClock::now(), const elt::oldb::CiiOldbDpQuality &dp_quality=elt::oldb::CiiOldbDpQuality::OK) const
Definition observablePublisher.hpp:119