HLCC Documentation 2.2.0
Loading...
Searching...
No Matches
circularBufferConcurrent.ipp
Go to the documentation of this file.
1// SPDX-FileCopyrightText: 2020-2025 European Southern Observatory (ESO)
2//
3// SPDX-License-Identifier: LGPL-3.0-only
4/*
5 * circularBufferConcurrent.hpp
6 *
7 * Created on: Jul 7, 2021
8 * Author: hsommer
9 */
10
11#include <mutex>
12#include <condition_variable>
13#include <functional>
14
15#include <boost/circular_buffer.hpp>
16
17
18// HSO TODO: Check async calls in ProcessRequest in ~/git-rtc/rtctk/componentFramework/services/oldb/src/oldbAdapter.cpp
19
20
21namespace hlcc::oldbmux {
22
23template<typename T>
25 const std::string& name, std::size_t capacity, const log4cplus::Logger& logger)
26 : m_logger {logger},
27 cb {capacity},
28 mutex {},
29 buffer_not_empty {},
30 discard_listener {},
31 name {name}
32{
33 if (capacity < 1) {
34 // TODO what to throw? Java has throw new IllegalArgumentException("bufferCapacity must be >= 1.");
35 }
36}
37
38
39template<typename T>
43
44
45template<typename T>
47 {
48 std::lock_guard lck {mutex};
49
50 std::optional<T> data_old {};
51
52 if (cb.full() && discard_listener) {
53 data_old = std::move(cb.front()); // saves a copy
54 }
55
56 cb.push_back(std::move(data)); // discards the front data if buffer is full (i.e., assigns the new data to the old element).
58 if (data_old && discard_listener) {
59 discard_listener(*data_old);
60 }
61
62 // release lck before notifying the condition variable.
63 // If the thread waiting in PollBlocking wakes immediately, it doesn't then
64 // have to block again, waiting for us to unlock the mutex.
65 }
66 buffer_not_empty.notify_one();
67}
68
69
70template<typename T>
71std::optional<T> CircularBufferConcurrent<T>::Poll(std::chrono::milliseconds timeout) {
72 // condition variable needs to be able to lock and unlock the mutex.
73 // Our normal lock_guard doesn't allow this, thus we use unique_lock.
74 std::unique_lock lck {mutex};
75 // wait_for with predicate keeps looping after spurious wakeups.
76 // It yields the lock and re-acquires it after wakeup.
77 if (buffer_not_empty.wait_for(lck, timeout, [=](){return !cb.empty();})) {
78 // no timeout
79 T data = std::move(cb.front());
80 cb.pop_front();
81 return std::make_optional<T>(std::move(data));
82 } else {
83 // timeout
84 return {};
85 }
87
89template<typename T>
91 std::lock_guard lck {mutex};
92 cb.clear();
93}
94
96template<typename T>
98 std::lock_guard lck {mutex};
99 return cb.size();
100}
101
102
103template<typename T>
104const std::unique_ptr<std::scoped_lock<std::recursive_mutex>> CircularBufferConcurrent<T>::Lock() const {
105 return std::make_unique<std::scoped_lock<std::recursive_mutex>>(mutex);
106}
108
109template<typename T>
110void CircularBufferConcurrent<T>::SetDiscardListener(std::function<void(T&)> discard_listener) {
111 std::lock_guard lck {mutex};
112 this->discard_listener = discard_listener;
114
115template<typename T>
116boost::circular_buffer<T>& CircularBufferConcurrent<T>::GetCb() {
117 return cb;
118}
119
120
121} // end namespace hlcc::oldbmux
int Size() const
Definition circularBufferConcurrent.ipp:97
const std::unique_ptr< std::scoped_lock< std::recursive_mutex > > Lock() const
Definition circularBufferConcurrent.ipp:104
std::optional< T > Poll(std::chrono::milliseconds timeout=std::chrono::milliseconds::zero())
Gets the oldest element from the circular buffer.
Definition circularBufferConcurrent.ipp:71
CircularBufferConcurrent(const std::string &name, std::size_t capacity, const log4cplus::Logger &logger)
Definition circularBufferConcurrent.ipp:24
boost::circular_buffer< T > & GetCb()
Definition circularBufferConcurrent.ipp:116
~CircularBufferConcurrent()
Definition circularBufferConcurrent.ipp:40
void Clear()
Definition circularBufferConcurrent.ipp:90
void SetDiscardListener(std::function< void(T &)> discard_listener)
Definition circularBufferConcurrent.ipp:110
void Push(T &&data)
Adds new data to the circular buffer.
Definition circularBufferConcurrent.ipp:46
Definition ciiOldbDataPointAsync.hpp:34
ccsinsdetifllnetio::PointingKernelPositions data
Definition pkp_llnetio_subscriber.cpp:33