9 #ifndef RTCTK_STANDALONETOOLS_SHMPUB_H
10 #define RTCTK_STANDALONETOOLS_SHMPUB_H
13 #include <boost/program_options.hpp>
16 #include <cfitsio/fitsio.h>
19 #include <numapp/mempolicy.hpp>
20 #include <numapp/numapolicies.hpp>
21 #include <numapp/thread.hpp>
24 #include <ipcq/writer.hpp>
40 static bool g_stop =
false;
50 std::cout << std::endl <<
"Signal to exit received " << std::endl;
79 template<
class TopicType,
class WriterType = ipcq::Writer<TopicType>>
82 ShmPub(
int argc,
char* argv[]) : m_help_only(false)
84 using namespace boost::program_options;
86 options_description desc(
"Allowed options");
88 (
"help,h",
"produce help message")
89 (
"fits-file,f", value<std::string>(&m_filename)->default_value(
""),
"fits input file: if not provided the app will generate data")
90 (
"queue-name,q", value<std::string>(&m_queue_name)->default_value(
"default_shm_queue"),
"shm queue name")
91 (
"queue-size,s", value<size_t>(&m_queue_size)->default_value(1000),
"size of the queue")
92 (
"sample-delay,d", value<int>(&m_sample_delay)->default_value(10),
"inter-sample delay in ms")
93 (
"numa-node,n", value<int>(&m_numa),
"numa node for shm queue")
94 (
"print-every,p", value<int>(&m_print_every)->default_value(0),
"when to print to screen the number of sample written")
95 (
"gen-frames,g",value<int>(&m_gen_frames)->default_value(100),
"Number of frames to generate")
96 (
"repeat-mode,r",bool_switch(&m_repeat_mode),
"Repeat output when all samples are written");
99 store(command_line_parser(argc, argv).options(desc).run(), vm);
102 if (vm.count(
"help")) {
104 std::cout << desc << std::endl;
107 std::cout <<
"fits-file: " << m_filename << std::endl;
108 std::cout <<
"queue-name: " << m_queue_name << std::endl;
109 std::cout <<
"queue-size: " << m_queue_size << std::endl;
110 std::cout <<
"sample-delay: " << m_sample_delay << std::endl;
111 if(vm.count(
"numa-node")) {
112 std::cout <<
"numa-node: " << m_numa << std::endl;
114 std::cout <<
"print-every: " << m_print_every << std::endl;
115 std::cout <<
"gen-frames: " << m_gen_frames << std::endl;
116 std::cout <<
"repeat-mode: " << m_repeat_mode << std::endl;
118 if(vm.count(
"numa-node")) {
119 m_writer = std::make_unique<WriterType>(m_queue_name.c_str(),
121 numapp::MemPolicy::MakeBindNode(m_numa));
123 m_writer = std::make_unique<WriterType>(m_queue_name.c_str(),
143 if(m_help_only) {
return 0; }
151 std::vector<TopicType> data;
156 if(not m_filename.empty()) {
157 std::cout <<
"Reading data from FITS file: " << m_filename << std::endl;
160 std::cout <<
"Generating data" << std::endl;
166 throw std::runtime_error(
"Data vector is not populated so will exit");
170 std::cout <<
"Writing data to shared memory queue" << std::endl;
173 }
catch(std::exception
const& e) {
175 std::cout << e.what() << std::endl;
182 std::this_thread::sleep_for(std::chrono::seconds(2));
204 virtual std::vector<TopicType>
GenData(
int num_frames) = 0;
217 void WriteToShm(std::vector<TopicType>& data)
219 using namespace std::chrono;
221 size_t n_written = 0;
222 auto t_sent = system_clock::now();
223 auto t_last = t_sent;
225 for(
auto& sample : data) {
226 if(g_stop) {
return; }
227 sample.sample_id = n_written;
228 t_sent = system_clock::now();
229 std::error_code err = m_writer->Write(sample, ipcq::Notify::All);
231 throw std::runtime_error(
"Error writing to shm: " + err.message());
235 if(n_written && m_print_every && (n_written % m_print_every == 0)) {
236 auto dur = duration_cast<milliseconds>(t_sent - t_last).count();
237 std::cout <<
"Samples written: " << n_written << std::endl;
238 std::cout <<
"Total time to write " << m_print_every <<
" : " << dur <<
" ms" << std::endl;
239 std::cout <<
"Average frame time: " << (float)dur/m_print_every <<
" ms" << std::endl;
243 std::this_thread::sleep_until(t_sent + milliseconds(m_sample_delay));
246 }
while(m_repeat_mode);
249 std::string m_queue_name;
251 std::string m_filename;
260 std::unique_ptr<WriterType> m_writer;
281 int col, typecode, anynul;
285 fits_get_colnum(fptr, CASESEN, name, &col, &status);
287 fits_report_error(stderr, status);
288 throw std::runtime_error(
"Error getting column: " + std::string(name));
291 fits_get_coltype(fptr, col, &typecode, &repeat, &width, &status);
293 fits_report_error(stderr, status);
294 throw std::runtime_error(
"Error getting coltype of:" + std::string(name));
298 std::cout <<
"name: " << name << std::endl;
299 std::cout <<
"col: " << col << std::endl;
300 std::cout <<
"typecode: " << typecode << std::endl;
301 std::cout <<
"repeat: " << repeat << std::endl;
302 std::cout <<
"width: " << width << std::endl;
307 data.resize(repeat*nrows);
309 fits_read_col(fptr, typecode, col, 1, 1, repeat*nrows, &nulval, d, &anynul, &status);
311 fits_report_error(stderr, status);
312 throw std::runtime_error(
"Error reading column: " + std::string(name));