API
xrif2shmim.hpp
Go to the documentation of this file.
1 /** \file xrif2shmim.hpp
2  * \brief The xrif2shmim class declaration and definition.
3  *
4  * \ingroup xrif2hmim_files
5  */
6 
7 #ifndef xrif2shmim_hpp
8 #define xrif2shmim_hpp
9 
10 #include <ImageStreamIO/ImageStruct.h>
11 #include <ImageStreamIO/ImageStreamIO.h>
12 
13 #include <xrif/xrif.h>
14 
15 #include <mx/ioutils/fileUtils.hpp>
16 #include <mx/improc/eigenCube.hpp>
17 #include <mx/ioutils/fits/fitsFile.hpp>
18 
19 #include <mx/sys/timeUtils.hpp>
20 
21 #include "../../libMagAOX/libMagAOX.hpp"
22 
23 /// Sleep for a specified period in microseconds.
24 /** \todo add timeutils to libMagAOX
25  */
26 inline
27 void microsleep( unsigned usec /**< [in] the number of microseconds to sleep. */)
28 {
29  std::this_thread::sleep_for(std::chrono::microseconds(usec));
30 }
31 
32 
33 /** \defgroup xrif2shmim xrif2shmim: xrif-archive Streamer
34  * \brief Stream images from an xrif archive to shared memory.
35  *
36  * <a href="../handbook/utils/xrif2shmim.html">Utility Documentation</a>
37  *
38  * \ingroup utils
39  *
40  */
41 
42 /** \defgroup xrif2hmim_files xrif2shmim Files
43  * \ingroup xrif2shmim
44  */
45 
46 bool g_timeToDie = false;
47 
48 void sigTermHandler( int signum,
49  siginfo_t *siginf,
50  void *ucont
51  )
52 {
53  //Suppress those warnings . . .
54  static_cast<void>(signum);
55  static_cast<void>(siginf);
56  static_cast<void>(ucont);
57 
58  std::cerr << "\n"; //clear out the ^C char
59 
60  g_timeToDie = true;
61 }
62 
63 /// A utility to stream MagaO-X images from xrif compressed archives to an ImageStreamIO stream.
64 /**
65  * \todo finish md doc for xrif2shmim
66  *
67  * \ingroup xrif2shmim
68  */
69 class xrif2shmim : public mx::app::application
70 {
71 protected:
72  /** \name Configurable Parameters
73  * @{
74  */
75 
76  std::string m_dir; ///< The directory to search for files. Can be empty if pull path given in files. If files is empty, all archives in dir will be used.
77 
78  std::vector<std::string> m_files; ///< List of files to use. If dir is not empty, it will be pre-pended to each name.
79 
80  size_t m_numFrames {0}; ///< The number of frames to store in memory. This defines how many different images will be streamed. If 0 (the default), all frames found using dir and files are loaded and stored.
81 
82  bool m_earliest {false}; ///< If true, then the earliest numFrames in the archive are used. By default (if not set) the latest numFrames are used.
83 
84  std::string m_shmimName {"xrif2shmim"}; ///< The name of the shared memory buffer to stream to. Default is "xrif2shmim".
85 
86  uint32_t m_circBuffLength {1}; ///< The length of the shared memory circular buffer. Default is 1.
87 
88  double m_fps {10}; ///< The rate, in frames per second, at which to stream images. Default is 10 fps.
89 
90  ///@}
91 
92 
93  xrif_t m_xrif {nullptr};
94 
95  /** \name Image Data
96  * @{
97  */
98 
99  uint32_t m_width {0}; ///< The width of the image.
100  uint32_t m_height {0}; ///< The height of the image.
101 
102  mx::improc::eigenCube<float> m_frames;
103 
104  uint8_t m_dataType; ///< The ImageStreamIO type code.
105 
106  size_t m_typeSize {0}; ///< The size of the type, in bytes. Result of sizeof.
107 
108 
109  IMAGE m_imageStream; ///< The ImageStreamIO shared memory buffer.
110 
111 
112  ///@}
113 
114 public:
115 
116  ~xrif2shmim();
117 
118  virtual void setupConfig();
119 
120  virtual void loadConfig();
121 
122  virtual int execute();
123 };
124 
125 inline
127 {
128  if(m_xrif)
129  {
130  xrif_delete(m_xrif);
131  }
132 }
133 
134 inline
136 {
137  config.add("dir","d", "dir" , argType::Required, "", "dir", false, "string", "The directory to search for files. Can be empty if pull path given in files.");
138  config.add("files","f", "files" , argType::Required, "", "files", false, "vector<string>", "List of files to use. If dir is not empty, it will be pre-pended to each name.");
139  config.add("numFrames","N", "numFrames" , argType::Required, "", "numFrames", false, "int", "The number of frames to store in memory. This defines how many different images will be streamed. If 0 (the default), all frames found using dir and files are loaded and stored.");
140  config.add("earliest","e", "earliest" , argType::True, "", "earliest", false, "bool", "If set or true, then the earliest numFrames in the archive are used. By default (if not set) the latest numFrames are used.");
141  config.add("shmimName","n", "shmimName" , argType::Required, "", "shmimName", false, "string", "The name of the shared memory buffer to stream to. Default is \"xrif2shmim\"");
142  config.add("circBuffLength","L", "circBuffLength" , argType::Required, "", "circBuffLength", false, "int", "The length of the shared memory circular buffer. Default is 1.");
143 
144  config.add("fps","F", "fps" , argType::Required, "", "fps", false, "float", "The rate, in frames per second, at which to stream images. Default is 10 fps.");
145 }
146 
147 inline
149 {
150  config(m_dir, "dir");
151  config(m_files, "files");
152  config(m_numFrames, "numFrames");
153  config(m_earliest, "earliest");
154  config(m_shmimName, "shmimName");
155  config(m_circBuffLength, "circBuffLength");
156  config(m_fps, "fps");
157 }
158 
159 inline
161 {
162  //Install signal handling
163  struct sigaction act;
164  sigset_t set;
165 
166  act.sa_sigaction = sigTermHandler;
167  act.sa_flags = SA_SIGINFO;
168  sigemptyset(&set);
169  act.sa_mask = set;
170 
171  errno = 0;
172  if( sigaction(SIGTERM, &act, 0) < 0 )
173  {
174  std::cerr << " (" << invokedName << "): error setting SIGTERM handler: " << strerror(errno) << "\n";
175  return -1;
176  }
177 
178  errno = 0;
179  if( sigaction(SIGQUIT, &act, 0) < 0 )
180  {
181  std::cerr << " (" << invokedName << "): error setting SIGQUIT handler: " << strerror(errno) << "\n";
182  return -1;
183  }
184 
185  errno = 0;
186  if( sigaction(SIGINT, &act, 0) < 0 )
187  {
188  std::cerr << " (" << invokedName << "): error setting SIGINT handler: " << strerror(errno) << "\n";
189  return -1;
190  }
191 
192  //Figure out which files to use
193  if(m_files.size() == 0)
194  {
195  if(m_dir == "")
196  {
197  m_dir = "./";
198  }
199 
200  m_files = mx::ioutils::getFileNames( m_dir, "", "", ".xrif");
201  }
202  else
203  {
204  if(m_dir != "")
205  {
206  if(m_dir[m_dir.size()-1] != '/') m_dir += '/';
207  }
208 
209  for(size_t n=0; n<m_files.size(); ++n)
210  {
211  m_files[n] = m_dir + m_files[n];
212  }
213  }
214 
215  if(m_files.size() == 0)
216  {
217  std::cerr << " (" << invokedName << "): No files found.\n";
218  return -1;
219  }
220 
221 
222  xrif_error_t rv;
223  rv = xrif_new(&m_xrif);
224 
225  if(rv < 0)
226  {
227  std::cerr << " (" << invokedName << "): Error allocating xrif.\n";
228  return -1;
229  }
230 
231  long st = 0;
232  long ed = m_files.size();
233  int stp = 1;
234 
235  if(m_numFrames != 0 && !m_earliest)
236  {
237  st = m_files.size()-1;
238  ed = -1;
239  stp = -1;
240  }
241 
242  char header[XRIF_HEADER_SIZE];
243 
244  size_t nframes = 0;
245 
246  //First get number of frames.
247  for(long n=st; n != ed; n += stp)
248  {
249  FILE * fp_xrif = fopen(m_files[n].c_str(), "rb");
250  size_t nr = fread(header, 1, XRIF_HEADER_SIZE, fp_xrif);
251  fclose(fp_xrif);
252  if(nr != XRIF_HEADER_SIZE)
253  {
254  std::cerr << " (" << invokedName << "): Error reading header of " << m_files[n] << "\n";
255  return -1;
256  }
257 
258  uint32_t header_size;
259  xrif_read_header(m_xrif, &header_size , header);
260 
261  if(n==st)
262  {
263  m_width = m_xrif->width;
264  m_height = m_xrif->height;
265  m_dataType = m_xrif->type_code;
266  }
267  else
268  {
269  if(m_xrif->width != m_width)
270  {
271  std::cerr << " (" << invokedName << "): width mis-match in " << m_files[n] << "\n";
272  return -1;
273  }
274  if(m_xrif->height != m_height)
275  {
276  std::cerr << " (" << invokedName << "): height mis-match in " << m_files[n] << "\n";
277  return -1;
278  }
279  if(m_xrif->type_code != m_dataType)
280  {
281  std::cerr << " (" << invokedName << "): data type mismatch in " << m_files[n] << "\n";
282  }
283  }
284 
285  if(m_xrif->depth != 1)
286  {
287  std::cerr << " (" << invokedName << "): Cubes detected in " << m_files[n] << "\n";
288  return -1;
289  }
290 
291  /*
292  if(m_dataType != XRIF_TYPECODE_INT16)
293  {
294  std::cerr << " (" << invokedName << "): Only 16-bit signed integerss (short) supported" << "\n";
295  return -1;
296  }
297  */
298 
299  nframes += m_xrif->frames;
300 
301  if(nframes >= m_numFrames && m_numFrames > 0)
302  {
303  ed = n + stp;
304  break;
305  }
306  }
307 
308  if(g_timeToDie != false)
309  {
310  std::cerr << " (" << invokedName << "): exiting.\n";
311  return -1;
312  }
313 
314  //Now record the actual number of frames
315  if(m_numFrames == 0 || nframes < m_numFrames) m_numFrames = nframes;
316 
317  std::cerr << " (" << invokedName << "): Reading " << m_numFrames << " frames in " << (ed-st)*stp << " file";
318  if( (ed-st)*stp > 1) std::cerr << "s";
319  std::cerr << "\n";
320 
321  //Allocate the storage
322  m_typeSize = xrif_typesize(m_dataType);
323 
325 
326  //Determine the order in which frames are copied
327  int findex = 0;
328  int fed = m_frames.planes();
329  if(stp == -1)
330  {
331  findex = m_frames.planes()-1;
332  fed = -1;
333  }
334 
335  //Now de-compress and load the frames
336  //Only decompressing the number of files needed, and only copying the number of frames needed
337  for(long n=st; n != ed; n += stp)
338  {
339  if(g_timeToDie == true) break; //check before going on
340 
341  FILE * fp_xrif = fopen(m_files[n].c_str(), "rb");
342  size_t nr = fread(header, 1, XRIF_HEADER_SIZE, fp_xrif);
343  if(nr != XRIF_HEADER_SIZE)
344  {
345  std::cerr << " (" << invokedName << "): Error reading header of " << m_files[n] << "\n";
346  fclose(fp_xrif);
347  return -1;
348  }
349 
350  uint32_t header_size;
351  xrif_read_header(m_xrif, &header_size , header);
352 
353  xrif_allocate_raw(m_xrif);
354  xrif_allocate_reordered(m_xrif);
355 
356  nr = fread(m_xrif->raw_buffer, 1, m_xrif->compressed_size, fp_xrif);
357  fclose(fp_xrif);
358 
359  if(g_timeToDie == true) break; //check after the long read.
360 
361  if(nr != m_xrif->compressed_size)
362  {
363  std::cerr << " (" << invokedName << "): Error reading data from " << m_files[n] << "\n";
364  return -1;
365  }
366 
367  xrif_decode(m_xrif);
368 
369  if(g_timeToDie == true) break; //check after the decompress.
370 
371  mx::improc::eigenCube<float> tmpc( (float*) m_xrif->raw_buffer, m_xrif->width, m_xrif->height, m_xrif->frames);
372 
373  //Determine the order in which frames in tmpc are read
374  long pst = 0;
375  long ped = tmpc.planes();
376  if(stp == -1)
377  {
378  pst = tmpc.planes()-1;
379  ped = -1;
380  }
381 
382  for( int p = pst; p != ped; p += stp)
383  {
384  m_frames.image(findex) = tmpc.image(p);
385  findex += stp;
386  if(findex == fed)
387  {
388  break;
389  }
390  }
391  }
392 
393  if(g_timeToDie != false)
394  {
395  std::cerr << " (" << invokedName << "): exiting.\n";
396  return -1;
397  }
398 
399  //De-allocate xrif
400  xrif_delete(m_xrif);
401  m_xrif = nullptr; //This is so destructor doesn't choke
402 
403  //Now create share memory stream.
404 
405  uint32_t imsize[3];
406  imsize[0] = m_width;
407  imsize[1] = m_height;
408  imsize[2] = m_circBuffLength;
409 
410  std::cerr << " (" << invokedName << "): Creating stream: " << m_shmimName << " (" << m_width << " x " << m_height << " x " << m_circBuffLength << ")\n";
411 
412  ImageStreamIO_createIm_gpu(&m_imageStream, m_shmimName.c_str(), 3, imsize, m_dataType, -1, 1, IMAGE_NB_SEMAPHORE, 0, CIRCULAR_BUFFER | ZAXIS_TEMPORAL, 0);
413 
414  m_imageStream.md->cnt1 = m_circBuffLength;
415 
416  //Begin streaming
417  uint64_t next_cnt1 = 0;
418  char * next_dest = (char *) m_imageStream.array.raw;
419  timespec * next_wtimearr = &m_imageStream.writetimearray[0];
420  timespec * next_atimearr = &m_imageStream.atimearray[0];
421  uint64_t * next_cntarr = &m_imageStream.cntarray[0];
422 
423  findex = 0;
424  double lastSend = mx::sys::get_curr_time();
425  double delta = 0;
426 
427  while(g_timeToDie == false)
428  {
429  m_imageStream.md->write=1;
430 
431  memcpy(next_dest, m_frames.image(findex).data(), m_width*m_height*m_typeSize);
432 
433  //Set the time of last write
434  clock_gettime(CLOCK_REALTIME, &m_imageStream.md->writetime);
435  m_imageStream.md->atime = m_imageStream.md->writetime;
436 
437  //Update cnt1
438  m_imageStream.md->cnt1 = next_cnt1;
439 
440  //Update cnt0
441  m_imageStream.md->cnt0++;
442 
443  *next_wtimearr = m_imageStream.md->writetime;
444  *next_atimearr = m_imageStream.md->atime;
445  *next_cntarr = m_imageStream.md->cnt0;
446 
447  //And post
448  m_imageStream.md->write=0;
449  ImageStreamIO_sempost(&m_imageStream,-1);
450 
451  //Now we increment pointers outside the time-critical part of the loop.
452  next_cnt1 = m_imageStream.md->cnt1+1;
453  if(next_cnt1 >= m_circBuffLength) next_cnt1 = 0;
454 
455  next_dest = (char *) m_imageStream.array.raw + next_cnt1*m_width*m_height*m_typeSize;
456  next_wtimearr = &m_imageStream.writetimearray[next_cnt1];
457  next_atimearr = &m_imageStream.atimearray[next_cnt1];
458  next_cntarr = &m_imageStream.cntarray[next_cnt1];
459 
460  ++findex;
461  if(findex >= m_frames.planes()) findex = 0;
462 
463 
464  double ct = mx::sys::get_curr_time();
465  delta += 0.1 * (ct-lastSend - 1.0/m_fps);
466  lastSend = ct;
467 
468 
469  if(1./m_fps - delta > 0) microsleep( (1./m_fps - delta)*1e6 ); //Argument is unsigned, since we can't unsleep, so don't pass a big number by axe.
470  }
471 
472  ImageStreamIO_destroyIm( &m_imageStream );
473 
474  std::cerr << " (" << invokedName << "): exited normally.\n";
475 
476  return 0;
477 }
478 
479 #endif //xrif2shmim_hpp
A utility to stream MagaO-X images from xrif compressed archives to an ImageStreamIO stream.
Definition: xrif2shmim.hpp:70
std::vector< std::string > m_files
List of files to use. If dir is not empty, it will be pre-pended to each name.
Definition: xrif2shmim.hpp:78
uint32_t m_height
The height of the image.
Definition: xrif2shmim.hpp:100
uint32_t m_width
The width of the image.
Definition: xrif2shmim.hpp:99
size_t m_typeSize
The size of the type, in bytes. Result of sizeof.
Definition: xrif2shmim.hpp:106
uint8_t m_dataType
The ImageStreamIO type code.
Definition: xrif2shmim.hpp:104
double m_fps
The rate, in frames per second, at which to stream images. Default is 10 fps.
Definition: xrif2shmim.hpp:88
size_t m_numFrames
The number of frames to store in memory. This defines how many different images will be streamed....
Definition: xrif2shmim.hpp:80
bool m_earliest
If true, then the earliest numFrames in the archive are used. By default (if not set) the latest numF...
Definition: xrif2shmim.hpp:82
IMAGE m_imageStream
The ImageStreamIO shared memory buffer.
Definition: xrif2shmim.hpp:109
std::string m_shmimName
The name of the shared memory buffer to stream to. Default is "xrif2shmim".
Definition: xrif2shmim.hpp:84
virtual void setupConfig()
Definition: xrif2shmim.hpp:135
std::string m_dir
The directory to search for files. Can be empty if pull path given in files. If files is empty,...
Definition: xrif2shmim.hpp:76
xrif_t m_xrif
Definition: xrif2shmim.hpp:93
mx::improc::eigenCube< float > m_frames
Definition: xrif2shmim.hpp:102
virtual int execute()
Definition: xrif2shmim.hpp:160
virtual void loadConfig()
Definition: xrif2shmim.hpp:148
uint32_t m_circBuffLength
The length of the shared memory circular buffer. Default is 1.
Definition: xrif2shmim.hpp:86
std::ostream & cerr()
bool g_timeToDie
Definition: xrif2shmim.hpp:46
void sigTermHandler(int signum, siginfo_t *siginf, void *ucont)
Definition: xrif2shmim.hpp:48
void microsleep(unsigned usec)
Sleep for a specified period in microseconds.
Definition: xrif2shmim.hpp:27