RTC Toolkit  0.1.0-alpha
shmPub.hpp
Go to the documentation of this file.
1 
9 #ifndef RTCTK_STANDALONETOOLS_SHMPUB_H
10 #define RTCTK_STANDALONETOOLS_SHMPUB_H
11 
12 // arg parsing
13 #include <boost/program_options.hpp>
14 
15 // cfitsio
16 #include <cfitsio/fitsio.h>
17 
18 // include the numapp for threading
19 #include <numapp/mempolicy.hpp>
20 #include <numapp/numapolicies.hpp>
21 #include <numapp/thread.hpp>
22 
23 // include the ipcq for writer
24 #include <ipcq/writer.hpp>
25 
26 #include <iostream>
27 #include <vector>
28 #include <atomic>
29 #include <chrono>
30 #include <ctime>
31 #include <functional>
32 #include <numeric>
33 
40 static bool g_stop = false;
41 
48 void SignalHandler(int signal)
49 {
50  std::cout << std::endl << "Signal to exit received " << std::endl;
51  g_stop = true;
52 }
53 
54 
79 template<class TopicType, class WriterType = ipcq::Writer<TopicType>>
80 class ShmPub {
81  public:
82  ShmPub(int argc, char* argv[]) : m_help_only(false)
83  {
84  using namespace boost::program_options;
85 
86  options_description desc("Allowed options");
87  desc.add_options()
88  ("help,h", "produce help message")
89  ("fits-file,f", value<std::string>(&m_filename)->default_value(""), "fits input file: if not provided the app will generate data")
90  ("queue-name,q", value<std::string>(&m_queue_name)->default_value("default_shm_queue"), "shm queue name")
91  ("queue-size,s", value<size_t>(&m_queue_size)->default_value(1000), "size of the queue")
92  ("sample-delay,d", value<int>(&m_sample_delay)->default_value(10), "inter-sample delay in ms")
93  ("numa-node,n", value<int>(&m_numa), "numa node for shm queue")
94  ("print-every,p", value<int>(&m_print_every)->default_value(0), "when to print to screen the number of sample written")
95  ("gen-frames,g",value<int>(&m_gen_frames)->default_value(100), "Number of frames to generate")
96  ("repeat-mode,r",bool_switch(&m_repeat_mode), "Repeat output when all samples are written");
97 
98  variables_map vm;
99  store(command_line_parser(argc, argv).options(desc).run(), vm);
100  notify(vm);
101 
102  if (vm.count("help")) {
103  m_help_only = true;
104  std::cout << desc << std::endl;
105  }else {
106  m_help_only = false;
107  std::cout << "fits-file: " << m_filename << std::endl;
108  std::cout << "queue-name: " << m_queue_name << std::endl;
109  std::cout << "queue-size: " << m_queue_size << std::endl;
110  std::cout << "sample-delay: " << m_sample_delay << std::endl;
111  if(vm.count("numa-node")) {
112  std::cout << "numa-node: " << m_numa << std::endl;
113  }
114  std::cout << "print-every: " << m_print_every << std::endl;
115  std::cout << "gen-frames: " << m_gen_frames << std::endl;
116  std::cout << "repeat-mode: " << m_repeat_mode << std::endl;
117 
118  if(vm.count("numa-node")) {
119  m_writer = std::make_unique<WriterType>(m_queue_name.c_str(),
120  m_queue_size,
121  numapp::MemPolicy::MakeBindNode(m_numa));
122  }else {
123  m_writer = std::make_unique<WriterType>(m_queue_name.c_str(),
124  m_queue_size);
125  }
126  }
127  }
128 
129  virtual ~ShmPub() = default;
130 
141  int Run()
142  {
143  if(m_help_only) { return 0; }
144 
145  int ret_val = 0;
146 
147  try {
148 
149  signal(SIGINT,SignalHandler);
150 
151  std::vector<TopicType> data;
152 
153  // checks if filename has been indicated if it has loads data by calling the user
154  // overloaded function ReadFits if not provided calls the user overloaded function
155  // GenData
156  if(not m_filename.empty()) {
157  std::cout << "Reading data from FITS file: " << m_filename << std::endl;
158  data = ReadFits(m_filename);
159  }else {
160  std::cout << "Generating data" << std::endl;
161  data = GenData(m_gen_frames);
162  }
163 
164  // check to make sure m_data is populated
165  if(data.empty()) {
166  throw std::runtime_error("Data vector is not populated so will exit");
167  }
168 
169  // calls main loop
170  std::cout << "Writing data to shared memory queue" << std::endl;
171  WriteToShm(data);
172 
173  }catch(std::exception const& e) {
174 
175  std::cout << e.what() << std::endl;
176  ret_val = -1;
177  }
178 
179  // Close queue to signal and give readers time detach from queue
180  m_writer->Close();
181 #ifndef UNIT_TEST
182  std::this_thread::sleep_for(std::chrono::seconds(2));
183 #endif
184 
185  return ret_val;
186  }
187 
195  virtual std::vector<TopicType> ReadFits(std::string filename) = 0;
196 
204  virtual std::vector<TopicType> GenData(int num_frames) = 0;
205 
206  private:
217  void WriteToShm(std::vector<TopicType>& data)
218  {
219  using namespace std::chrono;
220 
221  size_t n_written = 0;
222  auto t_sent = system_clock::now();
223  auto t_last = t_sent;
224  do {
225  for(auto& sample : data) {
226  if(g_stop) { return; }
227  sample.sample_id = n_written;
228  t_sent = system_clock::now();
229  std::error_code err = m_writer->Write(sample, ipcq::Notify::All);
230  if(err) {
231  throw std::runtime_error("Error writing to shm: " + err.message());
232  }
233 
234  n_written++;
235  if(n_written && m_print_every && (n_written % m_print_every == 0)) {
236  auto dur = duration_cast<milliseconds>(t_sent - t_last).count();
237  std::cout << "Samples written: " << n_written << std::endl;
238  std::cout << "Total time to write " << m_print_every << " : " << dur << " ms" << std::endl;
239  std::cout << "Average frame time: " << (float)dur/m_print_every << " ms" << std::endl;
240  t_last = t_sent;
241  }
242 
243  std::this_thread::sleep_until(t_sent + milliseconds(m_sample_delay));
244  }
245 
246  }while(m_repeat_mode);
247  }
248 
249  std::string m_queue_name; //< Queue name to be used by the writer
250  size_t m_queue_size; //< number of position in shm queue
251  std::string m_filename; //< path to fits file being read
252  int m_sample_delay; //< delay between samples being writter (ms)
253  int m_numa; //< which numa node to provide writer
254 
255  int m_print_every; //< print status every N samples
256  int m_gen_frames; //< if generation data how many sample to be generated
257  bool m_repeat_mode; //< data will loop forever with an ever increasing sample_id
258  bool m_help_only; //< if help only will not enter writing loop
259 
260  std::unique_ptr<WriterType> m_writer; //< the ipcq writer
261 };
262 
277  template<class T>
278  std::vector<T> read_col_from_fits(fitsfile* fptr, char* name, long nrows, bool output=false)
279  {
280  int status = 0;
281  int col, typecode, anynul;
282  long repeat, width;
283  float nulval;
284 
285  fits_get_colnum(fptr, CASESEN, name, &col, &status);
286  if (status) {
287  fits_report_error(stderr, status);
288  throw std::runtime_error("Error getting column: " + std::string(name));
289  }
290 
291  fits_get_coltype(fptr, col, &typecode, &repeat, &width, &status);
292  if (status) {
293  fits_report_error(stderr, status);
294  throw std::runtime_error("Error getting coltype of:" + std::string(name));
295  }
296 
297  if(output) {
298  std::cout << "name: " << name << std::endl;
299  std::cout << "col: " << col << std::endl;
300  std::cout << "typecode: " << typecode << std::endl;
301  std::cout << "repeat: " << repeat << std::endl;
302  std::cout << "width: " << width << std::endl;
303  }
304 
305  // load in required data
306  std::vector<T> data;
307  data.resize(repeat*nrows); // we are assuming the vector to be matrix with row major.
308  T* d = data.data();
309  fits_read_col(fptr, typecode, col, 1, 1, repeat*nrows, &nulval, d, &anynul, &status);
310  if (status) {
311  fits_report_error(stderr, status);
312  throw std::runtime_error("Error reading column: " + std::string(name));
313  }
314  return data;
315  }
316 
317 }
318 #endif
rtctk::standaloneTools::ShmPub::ReadFits
virtual std::vector< TopicType > ReadFits(std::string filename)=0
rtctk::standaloneTools::ShmPub::Run
int Run()
Definition: shmPub.hpp:141
rtctk::standaloneTools::ShmPub
Definition: shmPub.hpp:80
rtctk::standaloneTools::ShmPub::ShmPub
ShmPub(int argc, char *argv[])
Definition: shmPub.hpp:82
rtctk::standaloneTools::read_col_from_fits
std::vector< T > read_col_from_fits(fitsfile *fptr, char *name, long nrows, bool output=false)
Definition: shmPub.hpp:278
rtctk::standaloneTools::ShmPub::GenData
virtual std::vector< TopicType > GenData(int num_frames)=0
rtctk::standaloneTools::ShmPub::~ShmPub
virtual ~ShmPub()=default
rtctk::standaloneTools
Definition: shmPub.hpp:34
rtctk::standaloneTools::SignalHandler
void SignalHandler(int signal)
Definition: shmPub.hpp:48
rtctkExampleDataTaskGenFitsData.filename
filename
Definition: rtctkExampleDataTaskGenFitsData.py:13