HLCC Documentation 2.2.0
Loading...
Searching...
No Matches
observablePublisher.hpp
Go to the documentation of this file.
1// SPDX-FileCopyrightText: 2020-2025 European Southern Observatory (ESO)
2//
3// SPDX-License-Identifier: LGPL-3.0-only
4
5#ifndef HLCC_OLDBMUX_CIIPUBLISHER_HPP_
6#define HLCC_OLDBMUX_CIIPUBLISHER_HPP_
7
8#include <string>
9#include <functional>
10#include <memory>
11#include <variant>
12
13#include <mal/Cii.hpp>
14#include <mal/utility/LoadMal.hpp>
15#include <mal/ps/qos/QoS.hpp>
16#include <mal/ps/Publisher.hpp>
17#include <mal/utility/Uri.hpp>
18
19#include <rad/oldbInterface.hpp>
20#include <taiclock/taiClock.hpp>
21
22#include <ciiLogManager.hpp>
23
24namespace hlcc::oldbmux {
25
32 std::chrono::nanoseconds malpub_elapsed;
33 std::chrono::nanoseconds observers_elapsed;
34};
35
40template<typename T>
42
43using ObsCallback = std::function<void (const T&)>;
44using ObsCallbackWithTimestamp = std::function<void (const T&, const taiclock::TaiClock::time_point&)>;
45using ObsCallbackWithDpQuality = std::function<void (const T&, const elt::oldb::CiiOldbDpQuality&)>;
46using ObsCallbackWithTimestampAndDpQuality =
47 std::function<void (const T&, const taiclock::TaiClock::time_point&, const elt::oldb::CiiOldbDpQuality&)>;
48using ObsVariant = std::variant<ObsCallback, ObsCallbackWithTimestamp, ObsCallbackWithDpQuality, ObsCallbackWithTimestampAndDpQuality>;
49
50public:
51
58 explicit ObservablePublisher(std::unique_ptr<::elt::mal::ps::Publisher<T>> malpub, bool use_malpub = true);
59
60 // Accepts the callback functions without user timestamp (internal timestamping)
61 void AddObserver(ObsCallback observer_function);
62
63 // Accepts the callback functions with user-provided timestamp parameter
64 void AddObserver(ObsCallbackWithTimestamp observer_function);
65
66 // Accepts the callback functions with Oldb datapoint quality
67 void AddObserver(ObsCallbackWithDpQuality observer_function);
68
69 // Accepts the callback functions with user-provided timestamp parameter and Oldb datapoint quality
70 void AddObserver(ObsCallbackWithTimestampAndDpQuality observer_function);
71
72 std::shared_ptr<T> createDataEntity();
73
75 const std::chrono::milliseconds& timeout,
76 const taiclock::TaiClock::time_point& timestamp = taiclock::TaiClock::now(),
77 const elt::oldb::CiiOldbDpQuality& dp_quality = elt::oldb::CiiOldbDpQuality::OK) const;
78
79private:
80 const log4cplus::Logger logger = elt::log::CiiLogManager::GetLogger();
81 const std::unique_ptr<::elt::mal::ps::Publisher<T>> m_malpub;
82 const bool m_use_malpub;
83 // Vector that will hold the observer functions in a variant due to both possibilities
84 // with and without timestamp parameter
85 std::vector<ObsVariant> m_observers;
86};
87
88template<typename T>
89ObservablePublisher<T>::ObservablePublisher(std::unique_ptr<::elt::mal::ps::Publisher<T>> malpub, bool use_malpub)
90: m_malpub {std::move(malpub)}, m_use_malpub {use_malpub} {
91}
92
93template<typename T>
94void ObservablePublisher<T>::AddObserver(ObsCallback observer_function) {
95 m_observers.push_back(observer_function);
96}
97
98template<typename T>
99void ObservablePublisher<T>::AddObserver(ObsCallbackWithTimestamp observer_function) {
100 m_observers.push_back(observer_function);
101}
102
103template<typename T>
104void ObservablePublisher<T>::AddObserver(ObsCallbackWithDpQuality observer_function) {
105 m_observers.push_back(observer_function);
106}
107
108template<typename T>
109void ObservablePublisher<T>::AddObserver(ObsCallbackWithTimestampAndDpQuality observer_function) {
110 m_observers.push_back(observer_function);
111}
112
113template<typename T>
115 return m_malpub->createDataEntity();
116}
117
118template<typename T>
120 const std::chrono::milliseconds& timeout,
121 const taiclock::TaiClock::time_point& timestamp,
122 const elt::oldb::CiiOldbDpQuality& dp_quality) const {
123 auto t_0 = std::chrono::steady_clock::now();
124 if (m_malpub != nullptr && m_use_malpub) {
125 logger.log(log4cplus::DEBUG_LOG_LEVEL, "publishing over MAL");
126 try {
127 m_malpub->publish(data, timeout);
128 } catch (std::exception& exc) {
129 std::string cause = exc.what();
130 logger.log(log4cplus::ERROR_LOG_LEVEL, "Ignoring: publishing to MAL failed, due to: " + cause);
131 }
132 }
133
134 auto t_1 = std::chrono::steady_clock::now();
135 logger.log(log4cplus::DEBUG_LOG_LEVEL, "notifying observers");
136 for (ObsVariant obs_func : m_observers) {
137 try {
138 // Check if variant contains callback function without timestamp
139 if (std::holds_alternative<ObsCallback>(obs_func)) {
140 auto func = std::get<ObsCallback>(obs_func);
141 func(data);
142 }
143 // Check if variant contains callback function with timestamp
144 else if (std::holds_alternative<ObsCallbackWithTimestamp>(obs_func)) {
145 auto func = std::get<ObsCallbackWithTimestamp>(obs_func);
146 func(data, timestamp);
147 }
148 // Check if variant contains callback function with Oldb dp quality
149 else if (std::holds_alternative<ObsCallbackWithDpQuality>(obs_func)) {
150 auto func = std::get<ObsCallbackWithDpQuality>(obs_func);
151 func(data, dp_quality);
152 }
153 // Check if variant contains callback function with timestamp & Oldb dp quality
154 else if (std::holds_alternative<ObsCallbackWithTimestampAndDpQuality>(obs_func)) {
155 auto func = std::get<ObsCallbackWithTimestampAndDpQuality>(obs_func);
156 func(data, timestamp, dp_quality);
157 }
158 // If variant is empty (should not be possible) log an error
159 else {
160 logger.log(log4cplus::ERROR_LOG_LEVEL, "Observable function variant holds NO valid"
161 "alternative.");
162 }
163 } catch (std::exception& exc) {
164 std::string cause = exc.what();
165 logger.log(log4cplus::DEBUG_LOG_LEVEL, "Ignoring: callback to observer failed, due to: " + cause);
166 }
167 }
168 logger.log(log4cplus::DEBUG_LOG_LEVEL, "publish done");
169
170 return PublishingTimeElapsed {(t_1 - t_0), (std::chrono::steady_clock::now() - t_1)};
171}
172
173}
174// namespace
175
176#endif
Definition observablePublisher.hpp:41
ObservablePublisher(std::unique_ptr<::elt::mal::ps::Publisher< T > > malpub, bool use_malpub=true)
Definition observablePublisher.hpp:89
void AddObserver(ObsCallback observer_function)
Definition observablePublisher.hpp:94
PublishingTimeElapsed Publish(const T &data, const std::chrono::milliseconds &timeout, const taiclock::TaiClock::time_point &timestamp=taiclock::TaiClock::now(), const elt::oldb::CiiOldbDpQuality &dp_quality=elt::oldb::CiiOldbDpQuality::OK) const
Definition observablePublisher.hpp:119
std::shared_ptr< T > createDataEntity()
Definition observablePublisher.hpp:114
Definition ciiOldbDataPointAsync.hpp:34
ccsinsdetifllnetio::PointingKernelPositions data
Definition pkp_llnetio_subscriber.cpp:33
Definition observablePublisher.hpp:31
std::chrono::nanoseconds observers_elapsed
Definition observablePublisher.hpp:33
std::chrono::nanoseconds malpub_elapsed
Definition observablePublisher.hpp:32