RTC Toolkit  0.1.0-alpha
ddsPubThread.hpp
Go to the documentation of this file.
1 
8 #ifndef RTCTK_REUSABLECOMPONENT_TELREPUB_DDS_PUB_THREAD_HPP_
9 #define RTCTK_REUSABLECOMPONENT_TELREPUB_DDS_PUB_THREAD_HPP_
10 
11 //#include "numaUtils.h"
12 
13 #include <boost/thread/thread.hpp>
14 //#include <boost/lockfree/queue.hpp>
15 #include <boost/lockfree/spsc_queue.hpp>
16 #include <chrono>
17 #include <iostream>
18 
20 
21 #pragma GCC diagnostic push
22 #pragma GCC diagnostic ignored "-Wregister"
23 
24 #include "agnostictopicif.h"
25 #include "agnostictopicifSupport.h"
26 #include <ndds/ndds_cpp.h>
27 #include <ndds/ndds_namespace_cpp.h>
28 
29 #pragma GCC diagnostic pop
30 
31 using namespace std;
32 using namespace std::chrono;
33 
34 using namespace rtctk::componentFramework; // for logger?
35 
36 namespace rtctk::telRepub {
37 
42 uint32_t const MAX_TOPIC_SIZE = 55000; // to be on safe side not to exceed 64k
43 
47 struct topicT
48 {
49  std::array<unsigned char, MAX_TOPIC_SIZE> data;
50  std::size_t size;
52 };
53 
59 template <int Q_SIZE=3000>
61 {
62 public:
63  PubThreadBase(const char *topic_name)
64 {
65 
66  queue_m = new boost::lockfree::spsc_queue<topicT, boost::lockfree::capacity<Q_SIZE> >;
67  topic_name_m = topic_name;
68  rcvSamples_m = 0;
69  skipedSamples_m = 0;
70  lastSkipedSample_m = 0;
71 }
72 
73  virtual ~PubThreadBase()
74  {
75  if (skipedSamples_m>0)
76  LOG4CPLUS_INFO_FMT(GetLogger(), "[%s] skipped %lu samples out of %lu ratio: %f. Last @: %u\n",
77  topic_name_m.c_str(), skipedSamples_m, rcvSamples_m, static_cast<double>(skipedSamples_m)/rcvSamples_m, lastSkipedSample_m);
78  else
79  LOG4CPLUS_INFO_FMT(GetLogger(), "[%s] Received: %lu samples. No samples skipped\n", topic_name_m.c_str(), rcvSamples_m);
80  delete queue_m;
81  }
82 
87  inline virtual bool push(topicT const & value) {
88  rcvSamples_m++;
89  push_ret= queue_m->push(value);
90  this->cond.notify_one();
91  if (!push_ret)
92  {
93  skipedSamples_m++;
94  LOG4CPLUS_DEBUG_FMT(GetLogger(), "[%s] SampleId: %u overrun, so far skipped %lu samples. Last @: %u\n",
95  topic_name_m.c_str(), value.sampleId, skipedSamples_m, lastSkipedSample_m);
96  lastSkipedSample_m = value.sampleId;
97  }
98 
99  return push_ret;
100  }
101  //inline bool pop(T &value){ return queue_m.pop(value); }
102 
103  std::string getTopicName(){ return topic_name_m; }
104 
105 protected:
106  bool push_ret;
107  boost::lockfree::spsc_queue<topicT, boost::lockfree::capacity<Q_SIZE> > *queue_m;
108  unsigned long rcvSamples_m;
109  unsigned long skipedSamples_m;
111  std::string topic_name_m;
112  boost::mutex mut;
113  boost::condition_variable cond;
114 };
115 
119 
120 template <typename T=rtctk::AgnosticTopic, int Q_SIZE=3000>
121 class PubThread : public PubThreadBase<Q_SIZE>
122 {
123 public:
124  PubThread(typename T::DataWriter *dw) : PubThreadBase<Q_SIZE>(dw->get_topic()->get_name())
125  {
126  LOG4CPLUS_DEBUG_FMT(GetLogger(), "%s", __PRETTY_FUNCTION__);
127  m_data_writer = dw;
128  m_framecounter = 0;
129  m_loop= true;
130  }
131 
133  {
134  LOG4CPLUS_DEBUG_FMT(GetLogger(), "%s", __PRETTY_FUNCTION__);
135  }
136 
142  {
143  DDS::ReturnCode_t retcode;
144  high_resolution_clock::time_point start_time;
145 // duration<double> elapsed_time;
146  //boost::mutex::scoped_lock lock(this->mut);
147  boost::unique_lock<boost::mutex> lock(this->mut);
148  topicT value;
149 
150  while(m_loop)
151  {
152  this->cond.wait(lock);
153  while (this->queue_m->pop(value))
154  {
155  start_time = high_resolution_clock::now();
156  // copy (should be done in specialization function
157 // m_message.serialized_data.from_array(value.data.data(), value.data.size());
158  m_message.serialized_data.loan_contiguous(value.data.data(), value.size, value.data.max_size());
159  m_message.sample_id = value.sampleId;
160  retcode = m_data_writer->write(m_message, DDS::HANDLE_NIL);
161 
162  m_message.serialized_data.unloan();
163 
164  m_framecounter++;
165 
166  if( retcode != DDS::RETCODE_OK) {
167  if (retcode==DDS::RETCODE_TIMEOUT)
168  {
169  LOG4CPLUS_ERROR_FMT(GetLogger(), "[%s]SampleId: %u. DDS write timeout!", this->topic_name_m.c_str(), value.sampleId);
170  }else
171  {
172  LOG4CPLUS_ERROR_FMT(GetLogger(), "[%s]SampleId: %u. DDS failed to write. Return code: %d",
173  this->topic_name_m.c_str(), value.sampleId, retcode);
174  }
175  }
176 
177  if ((m_framecounter%2000) == 0)
178  {
179  if (this->skipedSamples_m>0)
180  LOG4CPLUS_DEBUG_FMT(GetLogger(), "[%s]SampleId: %u. Dropped: %lu (%f). Last @: %u\n",
181  this->topic_name_m.c_str(), value.sampleId,
182  this->skipedSamples_m, static_cast<double>(this->skipedSamples_m)/this->rcvSamples_m,
183  this->lastSkipedSample_m
184  );
185  else
186  LOG4CPLUS_DEBUG_FMT(GetLogger(), "[%s]SampleId: %u. No Drops so far.\n", this->topic_name_m.c_str(), value.sampleId);
187  //printNUMAmem(this->queue_m, "Queue");
188  }
189 /*
190  elapsed_time = high_resolution_clock::now()-start_time;
191  while (elapsed_time.count() < 0.001)
192  {
193  elapsed_time = high_resolution_clock::now()-start_time;
194  }
195 */
196 
197  }//while
198 
199 
200  }//while
201 
202  cout << "[" << this->topic_name_m << "] Flushing queue ..." << endl;
203  //flushing queue
204  while (this->queue_m->pop(value))
205  {
206  //m_message.serialized_data.from_array(value.data.data(), value.data.size());
207  m_message.serialized_data.loan_contiguous(value.data.data(), value.size, value.data.max_size());
208  m_message.sample_id = value.sampleId;
209  retcode = m_data_writer->write(m_message, DDS::HANDLE_NIL);
210  m_message.serialized_data.unloan();
211  m_framecounter++;
212  if ((m_framecounter%1) == 0)
213  {
214  LOG4CPLUS_DEBUG_FMT(GetLogger(), "[%s] Flushing SampleId: %u. Droped: %lu (%f). Last @ %u\n",
215  this->topic_name_m.c_str(), value.sampleId, this->skipedSamples_m,
216  static_cast<double>(this->skipedSamples_m)/this->rcvSamples_m,
217  this->lastSkipedSample_m
218  );
219  //printNUMAmem(this->queue_m, "Queue");
220  }
221  }//while
222 
223  }//readQueuePub
224 
228  void SimPub()
229  {
230  DDS::ReturnCode_t retcode;
231  unsigned int sampleId=0;
232  while(m_loop)
233  {
234  m_message.sample_id = sampleId;
235  //TBD: added some load to the sample
236  retcode = m_data_writer->write(m_message, DDS::HANDLE_NIL);
237 
238  m_framecounter++;
239 
240  if( retcode != DDS::RETCODE_OK) {
241  if (retcode==DDS::RETCODE_TIMEOUT)
242  {
243  std::cerr << "[" << this->topic_name_m << "] Timeout while sending " << m_framecounter << " sample" << std::endl;
244  }else
245  {
246  std::cerr << "[" << this->topic_name_m << "] Failed to send " << m_framecounter << " sample return code: " << retcode << std::endl;
247  }
248  }//if
249 
250  sampleId++;
251  usleep(m_simLoopSleep);
252  }//while
253  }//SimPub
254 
259  void CreateSimThread(u_int16_t f) {
260  m_simLoopSleep = 1000000/f;
261  m_thread = boost::thread(&PubThread<T, Q_SIZE>::SimPub, this);
262  std::string thrName="Spub"+this->topic_name_m;
263  pthread_setname_np(this->m_thread.native_handle(), thrName.c_str());
264  }
265 
269  void CreateThread() {
270  m_thread = boost::thread(&PubThread<T, Q_SIZE>::ReadQueuePub, this);
271  std::string thrName="Dpub"+this->topic_name_m;
272  pthread_setname_np(this->m_thread.native_handle(), thrName.c_str());
273  }
274 
278  void JoinThread() { m_loop =false; this->cond.notify_one(); m_thread.join(); }
279 
284  void SetAffinity(unsigned int cpu)
285  {
286  int ret;
287  cpu_set_t cpuset;
288  CPU_ZERO(&cpuset);
289  CPU_SET(cpu, &cpuset);
290  ret = pthread_setaffinity_np(m_thread.native_handle(), sizeof(cpuset), &cpuset);
291  if ( ret!=0 )
292  {
293  char thr_name[64];
294  pthread_getname_np(m_thread.native_handle(), thr_name, 64);
295  LOG4CPLUS_ERROR_FMT(GetLogger(), "%s ERROR set thread affinity for: %s. Error: %s(%d)", __PRETTY_FUNCTION__, thr_name, strerror(ret), ret);
296  }//if
297  }//SetAffinity
298 
300  {
301  cpu_set_t cpuset;
302  CPU_ZERO(&cpuset);
303 
304  pthread_getaffinity_np(m_thread.native_handle(), sizeof(cpuset), &cpuset);
305 
306  std::string af;
307  for (int j = 0; j < CPU_SETSIZE; j++)
308  if (CPU_ISSET(j, &cpuset)) { af+=std::to_string(j)+" ";}
309  char thr_name[64];
310  pthread_getname_np(m_thread.native_handle(), thr_name, 64);
311  LOG4CPLUS_ERROR_FMT(GetLogger(), " CPU affinity for thread: %s: %s", thr_name, af.c_str());
312  }//GetAffinity
313 
314 
316  {
317  cout << "wait_for_acknowledgments" << endl;
318  DDS::Duration_t to = {10, 0}; //10s
319  DDS::ReturnCode_t ret = m_data_writer->wait_for_acknowledgments(to);
320  if( ret != DDS::RETCODE_OK) {
321  std::cerr << " !Failed while waiting for acknowledgment of "
322  << "data being received by subscriptions, some data "
323  << "may not have been delivered." << std::endl;
324  }//if
325  }//wait_for_acknowledgments
326 
327 protected:
328  boost::thread m_thread;
329  bool m_loop;
330  unsigned long m_framecounter;
331 
333  typename T::DataWriter *m_data_writer;
334 
335  useconds_t m_simLoopSleep;
336 };
337 
338 } // closing namespace
339 
340 #endif
rtctk::telRepub::PubThread::m_thread
boost::thread m_thread
Definition: ddsPubThread.hpp:328
rtctk::telRepub::PubThread
Definition: ddsPubThread.hpp:122
rtctk::telRepub::PubThreadBase::cond
boost::condition_variable cond
Definition: ddsPubThread.hpp:113
rtctk::telRepub::PubThread::ReadQueuePub
void ReadQueuePub()
Definition: ddsPubThread.hpp:141
rtctk::telRepub::PubThread::SetAffinity
void SetAffinity(unsigned int cpu)
Definition: ddsPubThread.hpp:284
rtctk::telRepub::PubThreadBase::lastSkipedSample_m
uint32_t lastSkipedSample_m
Definition: ddsPubThread.hpp:110
rtctk::telRepub::PubThreadBase::mut
boost::mutex mut
Definition: ddsPubThread.hpp:112
rtctk::componentFramework
Definition: rtcComponent.hpp:17
rtctk::telRepub::PubThreadBase::queue_m
boost::lockfree::spsc_queue< topicT, boost::lockfree::capacity< Q_SIZE > > * queue_m
Definition: ddsPubThread.hpp:107
rtctk::telRepub::PubThreadBase::skipedSamples_m
unsigned long skipedSamples_m
Definition: ddsPubThread.hpp:109
rtctk::telRepub
Definition: ddsPub.cpp:12
rtctk::telRepub::PubThread::CreateSimThread
void CreateSimThread(u_int16_t f)
Definition: ddsPubThread.hpp:259
rtctk::telRepub::PubThread::~PubThread
~PubThread()
Definition: ddsPubThread.hpp:132
rtctk::telRepub::PubThread::m_data_writer
T::DataWriter * m_data_writer
Definition: ddsPubThread.hpp:333
rtctk::telRepub::PubThread::CreateThread
void CreateThread()
Definition: ddsPubThread.hpp:269
rtctk::telRepub::PubThread::SimPub
void SimPub()
Definition: ddsPubThread.hpp:228
rtctk::telRepub::PubThreadBase::topic_name_m
std::string topic_name_m
Definition: ddsPubThread.hpp:111
rtctk::telRepub::PubThread::m_framecounter
unsigned long m_framecounter
Definition: ddsPubThread.hpp:330
rtctk::componentFramework::GetLogger
log4cplus::Logger & GetLogger(const std::string &name="")
rtctk::telRepub::PubThreadBase::rcvSamples_m
unsigned long rcvSamples_m
Definition: ddsPubThread.hpp:108
rtctk::telRepub::PubThread::PrintAffinity
void PrintAffinity()
Definition: ddsPubThread.hpp:299
rtctk::telRepub::PubThreadBase::push_ret
bool push_ret
Definition: ddsPubThread.hpp:106
rtctk::telRepub::PubThreadBase::getTopicName
std::string getTopicName()
Definition: ddsPubThread.hpp:103
rtctk::telRepub::topicT::data
std::array< unsigned char, MAX_TOPIC_SIZE > data
Definition: ddsPubThread.hpp:49
rtctk::telRepub::PubThread::m_simLoopSleep
useconds_t m_simLoopSleep
Definition: ddsPubThread.hpp:335
rtctk::telRepub::topicT
Definition: ddsPubThread.hpp:48
rtctk::telRepub::PubThreadBase::PubThreadBase
PubThreadBase(const char *topic_name)
Definition: ddsPubThread.hpp:63
rtctk::telRepub::PubThreadBase::~PubThreadBase
virtual ~PubThreadBase()
Definition: ddsPubThread.hpp:73
rtctk::telRepub::PubThreadBase::push
virtual bool push(topicT const &value)
Definition: ddsPubThread.hpp:87
rtctk::telRepub::PubThread::PubThread
PubThread(typename T::DataWriter *dw)
Definition: ddsPubThread.hpp:124
rtctk::telRepub::topicT::size
std::size_t size
Definition: ddsPubThread.hpp:50
mudpi::uint32_t
unsigned int uint32_t
Definition: mudpi.h:16
rtctk::telRepub::PubThread::m_message
T m_message
Definition: ddsPubThread.hpp:332
std
Definition: mudpiProcessingError.hpp:109
rtctk::telRepub::topicT::sampleId
uint32_t sampleId
Definition: ddsPubThread.hpp:51
logger.hpp
Logging Support Library based on log4cplus.
rtctk::telRepub::MAX_TOPIC_SIZE
uint32_t const MAX_TOPIC_SIZE
Definition: ddsPubThread.hpp:42
rtctk::telRepub::PubThread::m_loop
bool m_loop
Definition: ddsPubThread.hpp:329
rtctk::telRepub::PubThread::JoinThread
void JoinThread()
Definition: ddsPubThread.hpp:278
rtctk::telRepub::PubThreadBase
Definition: ddsPubThread.hpp:61
rtctk::telRepub::PubThread::WaitForAcknowledgments
void WaitForAcknowledgments()
Definition: ddsPubThread.hpp:315