43inline std::ostream&
operator<<(std::ostream& os,
const std::vector<T>& vec) {
45 for (std::size_t i = 0; i < vec.size(); ++i) {
47 if (i != vec.size() - 1) {
64 std::shared_ptr<elt::oldb::CiiOldbDataPoint<T>> delegate,
65 boost::asio::thread_pool& async_exec,
66 const log4cplus::Logger& logger,
67 std::size_t buffer_capacity )
71 buffer {name, buffer_capacity, logger},
72 async_exec {async_exec},
73 is_async_processing {false}
76 buffer.SetDiscardListener(
77 [&logger = this->logger, &name = this->name, &buffer=this->buffer](
OldbDataWithPromise& discarded_data) {
81 auto lock = buffer.Lock();
82 boost::circular_buffer<OldbDataWithPromise>& cb = buffer.GetCb();
85 std::ostringstream msg {};
88 msg <<
"value=" << *discarded_data.GetValue();
89 if (!oldest_retained_data.GetValue()) {
90 oldest_retained_data.SetValue(*discarded_data.GetValue());
91 msg <<
" (transferred to retained newer data)";
97 msg <<
"timestamp=" << *discarded_data.
GetTimestamp() <<
", ";
98 if (!oldest_retained_data.GetTimestamp()) {
99 oldest_retained_data.SetTimestamp(*discarded_data.
GetTimestamp());
100 msg <<
" (transferred to retained newer data)";
107 msg <<
"quality=" << quality_str;
111 LOG4CPLUS_DEBUG(logger,
"OLDB async (" << name <<
") discarded data: " << msg.str());
116 discarded_data.
GetPromise()->set_exception(boost::copy_exception(std::out_of_range{
"Dropped data."}));
129 return delegate->
ReadValue(check_bad_quality);
141 const T& value, int64_t timestamp, elt::oldb::CiiOldbDpQuality quality,
bool is_disable_publishing) {
151 elt::oldb::CiiOldbDpQuality quality,
bool is_disable_publishing) {
160 OldbData&& new_data,
bool is_disable_publishing) {
162 std::chrono::steady_clock::time_point t_0 {std::chrono::steady_clock::now()};
172 boost::promise<OldbData> promise {};
173 boost::future<OldbData> future = promise.get_future();
177 std::chrono::steady_clock::time_point t_1 {std::chrono::steady_clock::now()};
179 typename CiiOldbDataPointAsync<T>::OldbDataWithPromise new_data_with_promise {
181 new_data.GetValue(), new_data.GetTimestamp(), new_data.GetQuality(), std::move(promise)};
187 buffer.Push(std::move(new_data_with_promise));
189 std::chrono::steady_clock::time_point t_2 {std::chrono::steady_clock::now()};
191 if (new_data.GetValue()) {
192 LOG4CPLUS_TRACE(logger,
"OLDB async (" << name <<
") wrote to buffer: "
193 << *new_data_with_promise.GetValue() <<
", threadId=" << std::this_thread::get_id());
195 LOG4CPLUS_TRACE(logger,
"OLDB async (" << name <<
") wrote to buffer: <no value>"
196 <<
", threadId=" << std::this_thread::get_id());
200 bool expected =
false;
202 std::chrono::steady_clock::time_point t_3 {};
203 std::chrono::steady_clock::time_point t_4 {};
206 if (is_async_processing.compare_exchange_strong(expected,
true)) {
208 LOG4CPLUS_TRACE(logger, LOG4CPLUS_TEXT(
"OLDB async (") << name << LOG4CPLUS_TEXT(
") will use a writer thread."));
238 t_3 = std::chrono::steady_clock::now();
241 boost::asio::post(async_exec, [
this](){ WriteBufferToOldb(); });
243 t_4 = std::chrono::steady_clock::now();
246 t_3 = t_4 = std::chrono::steady_clock::now();
249 LOG4CPLUS_TRACE(logger, LOG4CPLUS_TEXT(
"OLDB async (") << name << LOG4CPLUS_TEXT(
") will rely on running writer thread."));
252 LOG4CPLUS_TRACE(logger,
"OLDB async (" << name <<
") Returning from WriteValue");
254 int d1 = std::chrono::duration_cast<std::chrono::microseconds>(t_1 - t_0).count();
255 int d2 = std::chrono::duration_cast<std::chrono::microseconds>(t_2 - t_1).count();
256 int d3 = std::chrono::duration_cast<std::chrono::microseconds>(t_3 - t_2).count();
257 int d4 = std::chrono::duration_cast<std::chrono::microseconds>(t_4 - t_3).count();
259 if (d1 + d2 + d3 + d4 > 1000) {
260 LOG4CPLUS_DEBUG(logger,
"CiiOldbDataPointAsync " << name <<
" WriteValue steps in micros: "
261 << d1 <<
", " << d2 <<
", " << d3 <<
", " << d4);;
268void CiiOldbDataPointAsync<T>::WriteBufferToOldb() {
270 LOG4CPLUS_TRACE(logger, LOG4CPLUS_TEXT(
"OLDB async (") << name << LOG4CPLUS_TEXT(
") writer thread started. is_async_processing=") << is_async_processing <<
271 ", threadId=" << std::this_thread::get_id());
274 std::optional<OldbDataWithPromise> data_to_write_opt {buffer.Poll()};
276 if (!data_to_write_opt) {
281 is_async_processing.store(
false);
283 while (data_to_write_opt.has_value()) {
284 LOG4CPLUS_TRACE(logger, LOG4CPLUS_TEXT(
"OLDB async (") << name <<
") thread got data and will call the sync writeValue.");
286 OldbDataWithPromise data_to_write {std::move(data_to_write_opt.value())};
289 auto start = std::chrono::steady_clock::now();
292 if (data_to_write.GetValue() && data_to_write.GetTimestamp() && data_to_write.GetQuality()) {
293 delegate->WriteValue(*data_to_write.GetValue(), *data_to_write.GetTimestamp(), *data_to_write.GetQuality(),
false);
294 LOG4CPLUS_TRACE(logger, LOG4CPLUS_TEXT(
"OLDB async (") << name <<
295 LOG4CPLUS_TEXT(
") wrote DP to OLDB in ") << std::chrono::duration_cast<std::chrono::milliseconds>((std::chrono::steady_clock::now() - start)).count() <<
296 " ms, data=" << LOG4CPLUS_TEXT(*data_to_write.GetValue()) );
298 else if (data_to_write.GetQuality()) {
299 delegate->SetQuality(*data_to_write.GetQuality());
300 LOG4CPLUS_TRACE(logger,
"OLDB async (" << name <<
301 ") wrote DP quality to OLDB in " << std::chrono::duration_cast<std::chrono::milliseconds>((std::chrono::steady_clock::now() - start)).count() <<
305 LOG4CPLUS_DEBUG(logger, LOG4CPLUS_TEXT(
"Programming error: OLDB async (") << name <<
306 ") could not write DP because of missing data in OldbDataWithPromise");
309 data_to_write.GetPromise()->set_value(std::move(data_to_write));
311 }
catch (
const elt::oldb::CiiOldbException& ex) {
312 LOG4CPLUS_DEBUG(logger, LOG4CPLUS_TEXT(
"OLDB async (") << name <<
") writing to OLDB failed: " << ex.what());
315 data_to_write.GetPromise()->set_exception(boost::copy_exception(ex));
319 LOG4CPLUS_WARN(logger, LOG4CPLUS_TEXT(
"OLDB async (") << name <<
") unknown exception.");
326 auto lock = buffer.Lock();
327 data_to_write_opt = buffer.Poll();
328 if (!data_to_write_opt.has_value()) {
329 is_async_processing.store(
false);
334 LOG4CPLUS_TRACE(logger, LOG4CPLUS_TEXT(
"OLDB async (") << name <<
335 LOG4CPLUS_TEXT(
") performed ") << write_count << LOG4CPLUS_TEXT(
" DP writes in the background, now releasing writer thread."));
Requestor class header file.
CiiOldbDataPointAsync is a wrapper for class CiiOldbDataPoint, to allow asynchronous writes to the OL...
Definition ciiOldbDataPointAsync.hpp:53
CiiOldbDataPointAsync(std::string name, std::shared_ptr< elt::oldb::CiiOldbDataPoint< T > > delegate, boost::asio::thread_pool &async_exec, const log4cplus::Logger &logger, std::size_t buffer_capacity=1)
Constructor.
Definition ciiOldbDataPointAsync.ipp:62
std::shared_ptr< elt::oldb::CiiOldbDpValue< T > > ReadValue(bool check_bad_quality=true)
Definition ciiOldbDataPointAsync.ipp:128
std::string CiiOldbDpQualityToString(const elt::oldb::CiiOldbDpQuality &quality)
Definition ciiTypesToString.cpp:9
elt::mal::future< T > future
Definition actionsCommands.cpp:103
Definition ciiOldbDataPointAsync.hpp:34
std::ostream & operator<<(std::ostream &os, const std::vector< T > &vec)
Definition ciiOldbDataPointAsync.ipp:43
Value type for data for buffering and writing to OLDB. This subclass is used internally by CiiOldbDat...
Definition ciiOldbDataPointAsync.hpp:215
boost::promise< CiiOldbDataPointAsync< T >::OldbData > * GetPromise()
Definition ciiOldbDataPointAsync.hpp:233
Value type for data for buffering and writing to OLDB. This base class is used as an interface toward...
Definition ciiOldbDataPointAsync.hpp:142
std::optional< T > GetValue() const
Definition ciiOldbDataPointAsync.hpp:175
std::optional< int64_t > GetTimestamp() const
Definition ciiOldbDataPointAsync.hpp:181
std::optional< elt::oldb::CiiOldbDpQuality > GetQuality() const
Definition ciiOldbDataPointAsync.hpp:187