8 #ifndef RTCTK_REUSABLECOMPONENT_TELREPUB_DDS_PUB_THREAD_HPP_
9 #define RTCTK_REUSABLECOMPONENT_TELREPUB_DDS_PUB_THREAD_HPP_
13 #include <boost/thread/thread.hpp>
15 #include <boost/lockfree/spsc_queue.hpp>
21 #pragma GCC diagnostic push
22 #pragma GCC diagnostic ignored "-Wregister"
24 #include "agnostictopicif.h"
25 #include "agnostictopicifSupport.h"
26 #include <ndds/ndds_cpp.h>
27 #include <ndds/ndds_namespace_cpp.h>
29 #pragma GCC diagnostic pop
32 using namespace std::chrono;
49 std::array<unsigned char, MAX_TOPIC_SIZE>
data;
59 template <
int Q_SIZE=3000>
66 queue_m =
new boost::lockfree::spsc_queue<topicT, boost::lockfree::capacity<Q_SIZE> >;
67 topic_name_m = topic_name;
70 lastSkipedSample_m = 0;
75 if (skipedSamples_m>0)
76 LOG4CPLUS_INFO_FMT(
GetLogger(),
"[%s] skipped %lu samples out of %lu ratio: %f. Last @: %u\n",
77 topic_name_m.c_str(), skipedSamples_m, rcvSamples_m,
static_cast<double>(skipedSamples_m)/rcvSamples_m, lastSkipedSample_m);
79 LOG4CPLUS_INFO_FMT(
GetLogger(),
"[%s] Received: %lu samples. No samples skipped\n", topic_name_m.c_str(), rcvSamples_m);
89 push_ret= queue_m->push(value);
90 this->cond.notify_one();
94 LOG4CPLUS_DEBUG_FMT(
GetLogger(),
"[%s] SampleId: %u overrun, so far skipped %lu samples. Last @: %u\n",
95 topic_name_m.c_str(), value.
sampleId, skipedSamples_m, lastSkipedSample_m);
107 boost::lockfree::spsc_queue<topicT, boost::lockfree::capacity<Q_SIZE> > *
queue_m;
113 boost::condition_variable
cond;
120 template <
typename T=rtctk::AgnosticTopic,
int Q_SIZE=3000>
126 LOG4CPLUS_DEBUG_FMT(
GetLogger(),
"%s", __PRETTY_FUNCTION__);
134 LOG4CPLUS_DEBUG_FMT(
GetLogger(),
"%s", __PRETTY_FUNCTION__);
143 DDS::ReturnCode_t retcode;
144 high_resolution_clock::time_point start_time;
147 boost::unique_lock<boost::mutex> lock(this->mut);
152 this->cond.wait(lock);
153 while (this->queue_m->pop(value))
155 start_time = high_resolution_clock::now();
158 m_message.serialized_data.loan_contiguous(value.
data.data(), value.
size, value.
data.max_size());
159 m_message.sample_id = value.
sampleId;
160 retcode = m_data_writer->write(m_message, DDS::HANDLE_NIL);
162 m_message.serialized_data.unloan();
166 if( retcode != DDS::RETCODE_OK) {
167 if (retcode==DDS::RETCODE_TIMEOUT)
169 LOG4CPLUS_ERROR_FMT(
GetLogger(),
"[%s]SampleId: %u. DDS write timeout!", this->topic_name_m.c_str(), value.
sampleId);
172 LOG4CPLUS_ERROR_FMT(
GetLogger(),
"[%s]SampleId: %u. DDS failed to write. Return code: %d",
173 this->topic_name_m.c_str(), value.
sampleId, retcode);
177 if ((m_framecounter%2000) == 0)
179 if (this->skipedSamples_m>0)
180 LOG4CPLUS_DEBUG_FMT(
GetLogger(),
"[%s]SampleId: %u. Dropped: %lu (%f). Last @: %u\n",
181 this->topic_name_m.c_str(), value.
sampleId,
182 this->skipedSamples_m,
static_cast<double>(this->skipedSamples_m)/this->rcvSamples_m,
183 this->lastSkipedSample_m
186 LOG4CPLUS_DEBUG_FMT(
GetLogger(),
"[%s]SampleId: %u. No Drops so far.\n", this->topic_name_m.c_str(), value.
sampleId);
202 cout <<
"[" << this->topic_name_m <<
"] Flushing queue ..." << endl;
204 while (this->queue_m->pop(value))
207 m_message.serialized_data.loan_contiguous(value.
data.data(), value.
size, value.
data.max_size());
208 m_message.sample_id = value.
sampleId;
209 retcode = m_data_writer->write(m_message, DDS::HANDLE_NIL);
210 m_message.serialized_data.unloan();
212 if ((m_framecounter%1) == 0)
214 LOG4CPLUS_DEBUG_FMT(
GetLogger(),
"[%s] Flushing SampleId: %u. Droped: %lu (%f). Last @ %u\n",
215 this->topic_name_m.c_str(), value.
sampleId, this->skipedSamples_m,
216 static_cast<double>(this->skipedSamples_m)/this->rcvSamples_m,
217 this->lastSkipedSample_m
230 DDS::ReturnCode_t retcode;
231 unsigned int sampleId=0;
234 m_message.sample_id = sampleId;
236 retcode = m_data_writer->write(m_message, DDS::HANDLE_NIL);
240 if( retcode != DDS::RETCODE_OK) {
241 if (retcode==DDS::RETCODE_TIMEOUT)
243 std::cerr <<
"[" << this->topic_name_m <<
"] Timeout while sending " << m_framecounter <<
" sample" << std::endl;
246 std::cerr <<
"[" << this->topic_name_m <<
"] Failed to send " << m_framecounter <<
" sample return code: " << retcode << std::endl;
251 usleep(m_simLoopSleep);
260 m_simLoopSleep = 1000000/f;
262 std::string thrName=
"Spub"+this->topic_name_m;
263 pthread_setname_np(this->m_thread.native_handle(), thrName.c_str());
271 std::string thrName=
"Dpub"+this->topic_name_m;
272 pthread_setname_np(this->m_thread.native_handle(), thrName.c_str());
278 void JoinThread() { m_loop =
false; this->cond.notify_one(); m_thread.join(); }
289 CPU_SET(cpu, &cpuset);
290 ret = pthread_setaffinity_np(m_thread.native_handle(),
sizeof(cpuset), &cpuset);
294 pthread_getname_np(m_thread.native_handle(), thr_name, 64);
295 LOG4CPLUS_ERROR_FMT(
GetLogger(),
"%s ERROR set thread affinity for: %s. Error: %s(%d)", __PRETTY_FUNCTION__, thr_name, strerror(ret), ret);
304 pthread_getaffinity_np(m_thread.native_handle(),
sizeof(cpuset), &cpuset);
307 for (
int j = 0; j < CPU_SETSIZE; j++)
308 if (CPU_ISSET(j, &cpuset)) { af+=std::to_string(j)+
" ";}
310 pthread_getname_np(m_thread.native_handle(), thr_name, 64);
311 LOG4CPLUS_ERROR_FMT(
GetLogger(),
" CPU affinity for thread: %s: %s", thr_name, af.c_str());
317 cout <<
"wait_for_acknowledgments" << endl;
318 DDS::Duration_t to = {10, 0};
319 DDS::ReturnCode_t ret = m_data_writer->wait_for_acknowledgments(to);
320 if( ret != DDS::RETCODE_OK) {
321 std::cerr <<
" !Failed while waiting for acknowledgment of "
322 <<
"data being received by subscriptions, some data "
323 <<
"may not have been delivered." << std::endl;