HLCC Documentation 2.2.0
Loading...
Searching...
No Matches
ciiOldbDataPointAsync.ipp
Go to the documentation of this file.
1
19#include <chrono>
21
22
23namespace hlcc::oldbmux {
24
25//--------------------------------------------------------------+
26// operator<< |
27// |
28// The buffer DiscardListener currently logs the dropped value. |
29// For non-primitive types, we need a stream insertion operator |
30// matching the template type T. |
31// |
32// Is it OK to "pollute" namespace hlcc::oldbmux with these |
33// operator definitions? Is there a better solution? |
34// Perhaps to somehow recognize a vector type in the discard |
35// listener lambda impl and to iterate there? |
36// |
37// For more data types, should their insertion operators be |
38// added centrally here, or should the user provide them in |
39// client code? |
40//--------------------------------------------------------------+
41
42template <typename T>
43inline std::ostream& operator<<(std::ostream& os, const std::vector<T>& vec) {
44 os << "{";
45 for (std::size_t i = 0; i < vec.size(); ++i) {
46 os << vec[i];
47 if (i != vec.size() - 1) {
48 os << ", ";
49 }
50 }
51 os << "}";
52 return os;
53}
54
55
56//--------------------------------------------------------------+
57// CiiOldbDataPointAsync |
58// |
59//--------------------------------------------------------------+
60
61template<typename T>
63 std::string name,
64 std::shared_ptr<elt::oldb::CiiOldbDataPoint<T>> delegate,
65 boost::asio::thread_pool& async_exec,
66 const log4cplus::Logger& logger,
67 std::size_t buffer_capacity )
68 : logger {logger},
69 name {name},
70 delegate {delegate},
71 buffer {name, buffer_capacity, logger},
72 async_exec {async_exec},
73 is_async_processing {false}
74{
75
76 buffer.SetDiscardListener(
77 [&logger = this->logger, &name = this->name, &buffer=this->buffer](OldbDataWithPromise& discarded_data) {
78
79 // Check the oldest remaining elements in the buffer, if it is incomplete. If our discarded_data has the missing fields,
80 // then they should be transferred to not get lost.
81 auto lock = buffer.Lock(); // should already be locked by CircularBufferConcurrent::Push calling us, but we do it also here for clarity.
82 boost::circular_buffer<OldbDataWithPromise>& cb = buffer.GetCb();
83 OldbDataWithPromise& oldest_retained_data = *cb.begin();
84
85 std::ostringstream msg {};
86
87 if (discarded_data.GetValue()) {
88 msg << "value=" << *discarded_data.GetValue();
89 if (!oldest_retained_data.GetValue()) {
90 oldest_retained_data.SetValue(*discarded_data.GetValue());
91 msg << " (transferred to retained newer data)";
92 }
93 msg << ", ";
94 }
95
96 if (discarded_data.GetTimestamp()) {
97 msg << "timestamp=" << *discarded_data.GetTimestamp() << ", ";
98 if (!oldest_retained_data.GetTimestamp()) {
99 oldest_retained_data.SetTimestamp(*discarded_data.GetTimestamp());
100 msg << " (transferred to retained newer data)";
101 }
102 msg << ", ";
103 }
104
105 if (discarded_data.GetQuality()) {
106 std::string quality_str = hlcc::cpputil::CiiOldbDpQualityToString(*discarded_data.GetQuality());
107 msg << "quality=" << quality_str;
108 // no need to transfer, since data produced with WriteValue has quality default value,
109 // and data produced with SetQuality anyway has the quality.
110 }
111 LOG4CPLUS_DEBUG(logger, "OLDB async (" << name << ") discarded data: " << msg.str());
112
113 // If a DP value was discarded from the buffer, we need to notify the client through its future object.
114 // TODO: Is there a standardized (std or ELT etc) exception for cancelled operations? std::out_of_range does not really fit..
115 // TODO use std::make_exception_ptr if we use std::promise instead of boost.
116 discarded_data.GetPromise()->set_exception(boost::copy_exception(std::out_of_range{"Dropped data."}));
117 });
118}
119
120
121template<typename T>
125
126
127template<typename T>
128std::shared_ptr<elt::oldb::CiiOldbDpValue<T>> CiiOldbDataPointAsync<T>::ReadValue(bool check_bad_quality) {
129 return delegate->ReadValue(check_bad_quality);
130}
131
132
139template<typename T>
140boost::future<typename CiiOldbDataPointAsync<T>::OldbData> CiiOldbDataPointAsync<T>::WriteValue(
141 const T& value, int64_t timestamp, elt::oldb::CiiOldbDpQuality quality, bool is_disable_publishing) {
142
143 // Note that we store a copy of value in new_data. Thus no lifetime issues with our "T& value" parameter.
144 typename CiiOldbDataPointAsync<T>::OldbData new_data {value, timestamp, quality};
145 return CiiOldbDataPointAsync<T>::WriteAsync(std::move(new_data), is_disable_publishing);
146}
147
148
149template<typename T>
150boost::future<typename CiiOldbDataPointAsync<T>::OldbData> CiiOldbDataPointAsync<T>::SetQuality(
151 elt::oldb::CiiOldbDpQuality quality, bool is_disable_publishing) {
152
153 typename CiiOldbDataPointAsync<T>::OldbData new_data {quality};
154 return CiiOldbDataPointAsync<T>::WriteAsync(std::move(new_data), is_disable_publishing);
155}
156
157
158template<typename T>
159boost::future<typename CiiOldbDataPointAsync<T>::OldbData> CiiOldbDataPointAsync<T>::WriteAsync(
160 OldbData&& new_data, bool is_disable_publishing) {
161
162 std::chrono::steady_clock::time_point t_0 {std::chrono::steady_clock::now()};
163
164 // We need a future/promise pair that is separate from any future we may obtain from running a thread.
165 // The reasons are
166 // (a) our lossy buffer, where we must "cancel" the future toward the client also
167 // when the data gets evicted from the buffer, without having attempted to write it to OLDB.
168 // See the SetDiscardListener call in our ctor.
169 // (b) the optimization that we only push the data to the buffer without starting a thread
170 // when there is already another thread busy with moving data from the buffer to the OLDB.
171 // The future will inform the client about any success or failure in writing the data to OLDB.
172 boost::promise<OldbData> promise {};
173 boost::future<OldbData> future = promise.get_future();
174
175 // TODO: handle is_disable_publishing
176
177 std::chrono::steady_clock::time_point t_1 {std::chrono::steady_clock::now()};
178
179 typename CiiOldbDataPointAsync<T>::OldbDataWithPromise new_data_with_promise {
180 // no std::move on the value, otherwise warning "moving a temporary object prevents copy elision"
181 new_data.GetValue(), new_data.GetTimestamp(), new_data.GetQuality(), std::move(promise)};
182
183 // Write data to the circular buffer.
184 // This discards older data if the buffer is full.
185 // Once the data is in the buffer, it may be processed immediately by a background
186 // thread that was started for the previously pushed data.
187 buffer.Push(std::move(new_data_with_promise));
188
189 std::chrono::steady_clock::time_point t_2 {std::chrono::steady_clock::now()};
190
191 if (new_data.GetValue()) {
192 LOG4CPLUS_TRACE(logger, "OLDB async (" << name << ") wrote to buffer: "
193 << *new_data_with_promise.GetValue() << ", threadId=" << std::this_thread::get_id());
194 } else {
195 LOG4CPLUS_TRACE(logger, "OLDB async (" << name << ") wrote to buffer: <no value>"
196 << ", threadId=" << std::this_thread::get_id());
197 }
198
199 // Trigger background processing of buffered data, unless we are already processing.
200 bool expected = false; // TODO: how to pass bool& as a 'false' literal, without declaring the variable?
201
202 std::chrono::steady_clock::time_point t_3 {};
203 std::chrono::steady_clock::time_point t_4 {};
204
205 // if is_async_processing == false then set it to true and execute the code block.
206 if (is_async_processing.compare_exchange_strong(expected, true)) {
207
208 LOG4CPLUS_TRACE(logger, LOG4CPLUS_TEXT("OLDB async (") << name << LOG4CPLUS_TEXT(") will use a writer thread."));
209
210 // Write the buffer content to the OLDB, using sync CII OLDB client API calls in a separate thread.
211 // New data can come in concurrently and will be included. The thread finishes when the buffer is fully drained.
212 //
213 // We use a shared thread pool to cut thread creation overhead and to centrally synchronize
214 // on thread completion during application shutdown (otherwise segfaults, see ETCS-629).
215 //
216 // Note that the externally managed thread pool resolves also an issue that we would otherwise have with using
217 // std::async. There would be the "waiting destructor" issue of the special version of future returned,
218 // see https://en.cppreference.com/w/cpp/thread/future/~future. The returned future
219 // runs out of scope before we return from this method. Thus this method itself waits for
220 // the background thread execution to finish, making OLDB access an unwanted sync call.
221 // This problem almost caused deprecation of std::async, see http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2013/n3780.pdf
222 // With std::thread we would have the same scope and blocking issue, unless we externally manage the single thread
223 // or we detach the thread. But then the C++ standard does not define what happens with detached threads when
224 // the application exits, and we cannot sync application shutdown on the detached thread. All messed up.
225 //
226 // Note on the number of threads and the task queue: In the Java prototype we used infinite maximumPoolSize
227 // (well, Integer.MAX_VALUE). Here in C++ with the much larger thread stack size and possibly other thread
228 // overhead, we use a quite limited number of threads.
229 // OLDB's underlying Redis is single-threaded per instance. The ECM installation uses 6 Redis instances,
230 // so that our 5 threads seem reasonable also from that perspective.
231 // The finite thread pool has an unlimited task queue in front.
232 // This should ensure that we return quickly from the 'post' call (where in reality it is sometimes very slow!)
233 // If new data arrives while a thread is already writing to OLDB, it will also include the new data.
234 // Thus it may happen that another thread will later find an empty data buffer. That should be almost no overhead
235 // thanks to reusing threads from a pool, and on the other hand it ensures that even when all threads are busy,
236 // no data will starve in the buffer.
237
238 t_3 = std::chrono::steady_clock::now();
239
240 // The lambda function helps to wrap our non-static member function WriteBufferToOldb as a std::function.
241 boost::asio::post(async_exec, [this](){ WriteBufferToOldb(); });
242
243 t_4 = std::chrono::steady_clock::now();
244
245 } else {
246 t_3 = t_4 = std::chrono::steady_clock::now();
247 // There is already a thread draining the buffer (writing to OLDB).
248 // Our data that we've pushed into the buffer will be considered by that other thread.
249 LOG4CPLUS_TRACE(logger, LOG4CPLUS_TEXT("OLDB async (") << name << LOG4CPLUS_TEXT(") will rely on running writer thread."));
250 }
251
252 LOG4CPLUS_TRACE(logger, "OLDB async (" << name << ") Returning from WriteValue");
253
254 int d1 = std::chrono::duration_cast<std::chrono::microseconds>(t_1 - t_0).count();
255 int d2 = std::chrono::duration_cast<std::chrono::microseconds>(t_2 - t_1).count();
256 int d3 = std::chrono::duration_cast<std::chrono::microseconds>(t_3 - t_2).count();
257 int d4 = std::chrono::duration_cast<std::chrono::microseconds>(t_4 - t_3).count();
258
259 if (d1 + d2 + d3 + d4 > 1000) {
260 LOG4CPLUS_DEBUG(logger, "CiiOldbDataPointAsync " << name << " WriteValue steps in micros: "
261 << d1 << ", " << d2 << ", " << d3 << ", " << d4);;
262 }
263 return future; // compiler will do copy elision or implied std::move
264}
265
266
267template<typename T>
268void CiiOldbDataPointAsync<T>::WriteBufferToOldb() {
269
270 LOG4CPLUS_TRACE(logger, LOG4CPLUS_TEXT("OLDB async (") << name << LOG4CPLUS_TEXT(") writer thread started. is_async_processing=") << is_async_processing <<
271 ", threadId=" << std::this_thread::get_id()); // note that the remote logs contain threadId also in the "SourceID" field.
272
273 int write_count = 0;
274 std::optional<OldbDataWithPromise> data_to_write_opt {buffer.Poll()};
275
276 if (!data_to_write_opt) {
277 // There is the rare case that method WriteValue pushed new data to the buffer while a background thread
278 // from a previous write was still draining the buffer, but that background write finishes before method WriteValue
279 // checks variable is_async_processing. Then we get here but have an empty buffer.
280 // All we need to do is restore this flag.
281 is_async_processing.store(false);
282 } else {
283 while (data_to_write_opt.has_value()) {
284 LOG4CPLUS_TRACE(logger, LOG4CPLUS_TEXT("OLDB async (") << name << ") thread got data and will call the sync writeValue.");
285
286 OldbDataWithPromise data_to_write {std::move(data_to_write_opt.value())};
287
288 try {
289 auto start = std::chrono::steady_clock::now(); // TODO do we have a Stopwatch utility class?
290
291 // make the synchronous OLDB call
292 if (data_to_write.GetValue() && data_to_write.GetTimestamp() && data_to_write.GetQuality()) {
293 delegate->WriteValue(*data_to_write.GetValue(), *data_to_write.GetTimestamp(), *data_to_write.GetQuality(), false);
294 LOG4CPLUS_TRACE(logger, LOG4CPLUS_TEXT("OLDB async (") << name <<
295 LOG4CPLUS_TEXT(") wrote DP to OLDB in ") << std::chrono::duration_cast<std::chrono::milliseconds>((std::chrono::steady_clock::now() - start)).count() <<
296 " ms, data=" << LOG4CPLUS_TEXT(*data_to_write.GetValue()) ); // TODO log also time and quality?
297 }
298 else if (data_to_write.GetQuality()) {
299 delegate->SetQuality(*data_to_write.GetQuality());
300 LOG4CPLUS_TRACE(logger, "OLDB async (" << name <<
301 ") wrote DP quality to OLDB in " << std::chrono::duration_cast<std::chrono::milliseconds>((std::chrono::steady_clock::now() - start)).count() <<
302 " ms, quality=" << hlcc::cpputil::CiiOldbDpQualityToString(*data_to_write.GetQuality()) );
303 }
304 else {
305 LOG4CPLUS_DEBUG(logger, LOG4CPLUS_TEXT("Programming error: OLDB async (") << name <<
306 ") could not write DP because of missing data in OldbDataWithPromise");
307 }
308
309 data_to_write.GetPromise()->set_value(std::move(data_to_write));
310 write_count++;
311 } catch (const elt::oldb::CiiOldbException& ex) {
312 LOG4CPLUS_DEBUG(logger, LOG4CPLUS_TEXT("OLDB async (") << name << ") writing to OLDB failed: " << ex.what());
313 try {
314 // store exception in the promise/future
315 data_to_write.GetPromise()->set_exception(boost::copy_exception(ex));
316 } catch(...) {} // set_exception() may throw too
317 } catch (...) {
318 // TODO: should we let the application crash, or also set an ex on the promise?
319 LOG4CPLUS_WARN(logger, LOG4CPLUS_TEXT("OLDB async (") << name << ") unknown exception.");
320 }
321
322 // Check if there is more data in the buffer that needs to be written to OLDB.
323 // We lock the buffer so that toggling of flag is_async_processing is safe from
324 // concurrent Push calls, which might otherwise lead to data starving in the buffer.
325 { // scope for the lock
326 auto lock = buffer.Lock();
327 data_to_write_opt = buffer.Poll();
328 if (!data_to_write_opt.has_value()) {
329 is_async_processing.store(false); // from here on, newly written data will call WriteBufferToOldb again.
330 // we'll exit from the while loop.
331 }
332 }
333 }
334 LOG4CPLUS_TRACE(logger, LOG4CPLUS_TEXT("OLDB async (") << name <<
335 LOG4CPLUS_TEXT(") performed ") << write_count << LOG4CPLUS_TEXT(" DP writes in the background, now releasing writer thread."));
336 }
337}
338
339} // namespace hlcc::oldbmux
Requestor class header file.
CiiOldbDataPointAsync is a wrapper for class CiiOldbDataPoint, to allow asynchronous writes to the OL...
Definition ciiOldbDataPointAsync.hpp:53
CiiOldbDataPointAsync(std::string name, std::shared_ptr< elt::oldb::CiiOldbDataPoint< T > > delegate, boost::asio::thread_pool &async_exec, const log4cplus::Logger &logger, std::size_t buffer_capacity=1)
Constructor.
Definition ciiOldbDataPointAsync.ipp:62
std::shared_ptr< elt::oldb::CiiOldbDpValue< T > > ReadValue(bool check_bad_quality=true)
Definition ciiOldbDataPointAsync.ipp:128
std::string CiiOldbDpQualityToString(const elt::oldb::CiiOldbDpQuality &quality)
Definition ciiTypesToString.cpp:9
elt::mal::future< T > future
Definition actionsCommands.cpp:103
Definition ciiOldbDataPointAsync.hpp:34
std::ostream & operator<<(std::ostream &os, const std::vector< T > &vec)
Definition ciiOldbDataPointAsync.ipp:43
Value type for data for buffering and writing to OLDB. This subclass is used internally by CiiOldbDat...
Definition ciiOldbDataPointAsync.hpp:215
boost::promise< CiiOldbDataPointAsync< T >::OldbData > * GetPromise()
Definition ciiOldbDataPointAsync.hpp:233
Value type for data for buffering and writing to OLDB. This base class is used as an interface toward...
Definition ciiOldbDataPointAsync.hpp:142
std::optional< T > GetValue() const
Definition ciiOldbDataPointAsync.hpp:175
std::optional< int64_t > GetTimestamp() const
Definition ciiOldbDataPointAsync.hpp:181
std::optional< elt::oldb::CiiOldbDpQuality > GetQuality() const
Definition ciiOldbDataPointAsync.hpp:187