API
streamWriter.hpp
Go to the documentation of this file.
1 /** \file streamWriter.hpp
2  * \brief The MagAO-X Image Stream Writer
3  *
4  * \author Jared R. Males (jaredmales@gmail.com)
5  *
6  * \ingroup streamWriter_files
7  */
8 
9 #ifndef streamWriter_hpp
10 #define streamWriter_hpp
11 
12 
13 #include <ImageStreamIO/ImageStruct.h>
14 #include <ImageStreamIO/ImageStreamIO.h>
15 
16 #include <xrif/xrif.h>
17 
18 #include <mx/sys/timeUtils.hpp>
19 
20 #include "../../libMagAOX/libMagAOX.hpp" //Note this is included on command line to trigger pch
21 #include "../../magaox_git_version.h"
22 
23 //#include "../../libMagAOX/app/MagAOXApp.hpp"
24 
25 #include "../../magaox_git_version.h"
26 
27 
28 
29 #define NOT_WRITING (0)
30 #define START_WRITING (1)
31 #define WRITING (2)
32 #define STOP_WRITING (3)
33 
34 namespace MagAOX
35 {
36 namespace app
37 {
38 
39 /** \defgroup streamWriter ImageStreamIO Stream Writing
40  * \brief Writes the contents of an ImageStreamIO image stream to disk.
41  *
42  * <a href="../handbook/operating/software/apps/streamWriter.html">Application Documentation</a>
43  *
44  * \ingroup apps
45  *
46  */
47 
48 /** \defgroup streamWriter_files ImageStreamIO Stream Writing
49  * \ingroup streamWriter
50  */
51 
52 /** MagAO-X application to control writing ImageStreamIO streams to disk.
53  *
54  * \ingroup streamWriter
55  *
56  */
57 class streamWriter : public MagAOXApp<>, public dev::telemeter<streamWriter>
58 {
60 
61  friend class dev::telemeter<streamWriter>;
62 
63  //Give the test harness access.
64  friend class streamWriter_test;
65 
66 protected:
67 
68  /** \name configurable parameters
69  *@{
70  */
71 
72  std::string m_rawimageDir; ///< The path where files will be saved.
73 
74  size_t m_circBuffLength {1024}; ///< The length of the circular buffer, in frames
75 
76  size_t m_writeChunkLength {512}; ///< The number of frames to write at a time
77 
78  std::string m_shmimName; ///< The name of the shared memory buffer.
79 
80  std::string m_outName; ///< The name to use for outputting files, Default is m_shmimName.
81 
82  int m_semaphoreNumber {7}; ///< The image structure semaphore index.
83 
84  unsigned m_semWait {500000000}; ///<The time in nsec to wait on the semaphore. Max is 999999999. Default is 5e8 nsec.
85 
86  int m_lz4accel {1};
87 
88  bool m_compress {true};
89  ///@}
90 
91 
92  size_t m_width {0}; ///< The width of the image
93  size_t m_height {0}; ///< The height of the image
94  uint8_t m_dataType {0}; ///< The ImageStreamIO type code.
95  int m_typeSize {0}; ///< The pixel byte depth
96 
97  char * m_rawImageCircBuff {nullptr};
98  uint64_t * m_timingCircBuff {nullptr};
99 
100  size_t m_currImage {0};
101 
102  //Writer book-keeping:
103  int m_writing {NOT_WRITING}; ///< Controls whether or not images are being written, and sequences start and stop of writing.
104 
105  uint64_t m_currChunkStart {0}; ///< The circular buffer starting position of the current to-be-written chunk.
106  uint64_t m_nextChunkStart {0}; ///< The circular buffer starting position of the next to-be-written chunk.
107 
108  uint64_t m_currSaveStart {0}; ///< The circular buffer position at which to start saving.
109  uint64_t m_currSaveStop {0}; ///< The circular buffer position at which to stop saving.
110 
111  bool m_logSaveStart {0}; ///< Flag indicating that the start saving log should entry should be made.
112  uint64_t m_currSaveStartFrameNo {0}; ///< The frame number of the image at which saving started (for logging)
113  uint64_t m_currSaveStopFrameNo {0}; ///< The frame number of the image at which saving stopped (for logging)
114 
115  ///The xrif compression handle for image data
116  xrif_t m_xrif {nullptr};
117 
118  ///Storage for the xrif image data file header
119  char * m_xrif_header {nullptr};
120 
121  ///The xrif compression handle for image data
122  xrif_t m_xrif_timing {nullptr};
123 
124  ///Storage for the xrif image data file header
125  char * m_xrif_timing_header {nullptr};
126 
127 
128 public:
129 
130  ///Default c'tor
131  streamWriter();
132 
133  ///Destructor
134  ~streamWriter() noexcept;
135 
136  /// Setup the configuration system (called by MagAOXApp::setup())
137  virtual void setupConfig();
138 
139  /// load the configuration system results (called by MagAOXApp::setup())
140  virtual void loadConfig();
141 
142  /// Startup functions
143  /** Sets up the INDI vars.
144  *
145  */
146  virtual int appStartup();
147 
148  /// Implementation of the FSM for the Siglent SDG
149  virtual int appLogic();
150 
151 
152  /// Do any needed shutdown tasks. Currently nothing in this app.
153  virtual int appShutdown();
154 
155 protected:
156 
157  /** \name SIGSEGV & SIGBUS signal handling
158  * These signals occur as a result of a ImageStreamIO source server resetting (e.g. changing frame sizes).
159  * When they occur a restart of the framegrabber and framewriter thread main loops is triggered.
160  *
161  * @{
162  */
163  bool m_restart {false};
164 
165  static streamWriter * m_selfWriter; ///< Static pointer to this (set in constructor). Used for getting out of the static SIGSEGV handler.
166 
167  /// Initialize the xrif system.
168  /** Allocates the handles and headers pointers.
169  *
170  * \returns 0 on success.
171  * \returns -1 on error.
172  */
173  int initialize_xrif();
174 
175  ///Sets the handler for SIGSEGV and SIGBUS
176  /** These are caused by ImageStreamIO server resets.
177  */
178  int setSigSegvHandler();
179 
180  ///The handler called when SIGSEGV or SIGBUS is received, which will be due to ImageStreamIO server resets. Just a wrapper for handlerSigSegv.
181  static void _handlerSigSegv( int signum,
182  siginfo_t *siginf,
183  void *ucont
184  );
185 
186  ///Handles SIGSEGV and SIGBUS. Sets m_restart to true.
187  void handlerSigSegv( int signum,
188  siginfo_t *siginf,
189  void *ucont
190  );
191  ///@}
192 
193  /** \name Framegrabber Thread
194  * This thread monitors the ImageStreamIO buffer and copies its images to the circular buffer.
195  *
196  * @{
197  */
198  int m_fgThreadPrio {1}; ///< Priority of the framegrabber thread, should normally be > 00.
199 
200  std::string m_fgCpuset; ///< The cpuset for the framegrabber thread. Ignored if empty (the default).
201 
202  std::thread m_fgThread; ///< A separate thread for the actual framegrabbings
203 
204  bool m_fgThreadInit {true}; ///< Synchronizer to ensure f.g. thread initializes before doing dangerous things.
205 
206  pid_t m_fgThreadID {0}; ///< F.g. thread PID.
207 
208  pcf::IndiProperty m_fgThreadProp; ///< The property to hold the f.g. thread details.
209 
210  /// Worker function to allocate the circular buffers.
211  /** This takes place in the fg thread after connecting to the stream.
212  *
213  * \returns 0 on sucess.
214  * \returns -1 on error.
215  */
216  int allocate_circbufs();
217 
218  /// Worker function to configure and allocate the xrif handles.
219  /** This takes place in the fg thread after connecting to the stream.
220  *
221  * \returns 0 on sucess.
222  * \returns -1 on error.
223  */
224  int allocate_xrif();
225 
226 
227  ///Thread starter, called by fgThreadStart on thread construction. Calls fgThreadExec.
228  static void fgThreadStart( streamWriter * s /**< [in] a pointer to an streamWriter instance (normally this) */);
229 
230  /// Execute the frame grabber main loop.
231  void fgThreadExec();
232 
233  ///@}
234 
235  /** \name Stream Writer Thread
236  * This thread writes chunks of the circular buffer to disk.
237  *
238  * @{
239  */
240  int m_swThreadPrio {1}; ///< Priority of the stream writer thread, should normally be > 0, and <= m_fgThreadPrio.
241 
242  std::string m_swCpuset; ///< The cpuset for the framegrabber thread. Ignored if empty (the default).
243 
244  sem_t m_swSemaphore; ///< Semaphore used to synchronize the fg thread and the sw thread.
245 
246  std::thread m_swThread; ///< A separate thread for the actual writing
247 
248  bool m_swThreadInit {true}; ///< Synchronizer to ensure s.w. thread initializes before doing dangerous things.
249 
250  pid_t m_swThreadID {0}; ///< S.w. thread pid.
251 
252  pcf::IndiProperty m_swThreadProp; ///< The property to hold the s.w. thread details.
253 
254  size_t m_fnameSz {0};
255 
256  char * m_fname {nullptr};
257 
258  std::string m_fnameBase;
259 
260  ///Thread starter, called by swThreadStart on thread construction. Calls swThreadExec.
261  static void swThreadStart( streamWriter * s /**< [in] a pointer to an streamWriter instance (normally this) */);
262 
263  /// Execute the stream writer main loop.
264  void swThreadExec();
265 
266  /// Function called when semaphore is raised to do the encode and write.
267  int doEncode();
268  ///@}
269 
270  //INDI:
271 protected:
272  //declare our properties
273  pcf::IndiProperty m_indiP_writing;
274 
275  pcf::IndiProperty m_indiP_xrifStats;
276 
277 public:
279 
280  void updateINDI();
281 
282  /** \name Telemeter Interface
283  *
284  * @{
285  */
286  int checkRecordTimes();
287 
288  int recordTelem( const telem_saving_state * );
289 
290  int recordSavingState( bool force = false );
291  int recordSavingStats( bool force = false );
292 
293  ///@}
294 
295 };
296 
297 //Set self pointer to null so app starts up uninitialized.
299 
300 inline
301 streamWriter::streamWriter() : MagAOXApp(MAGAOX_CURRENT_SHA1, MAGAOX_REPO_MODIFIED)
302 {
303  m_powerMgtEnabled = false;
304 
305  m_selfWriter = this;
306 
307  return;
308 }
309 
310 inline
312 {
313  if(m_xrif) xrif_delete(m_xrif);
314 
315  if(m_xrif_header) free(m_xrif_header);
316 
317  if(m_xrif_timing) xrif_delete(m_xrif_timing);
318 
320 
321 
322  return;
323 }
324 
325 inline
327 {
328  config.add("writer.savePath", "", "writer.savePath", argType::Required, "writer", "savePath", false, "string", "The absolute path where images are saved. Will use MagAO-X default if not set.");
329 
330  config.add("writer.circBuffLength", "", "writer.circBuffLength", argType::Required, "writer", "circBuffLength", false, "size_t", "The length in frames of the circular buffer. Should be an integer multiple of and larger than writeChunkLength.");
331 
332  config.add("writer.writeChunkLength", "", "writer.writeChunkLength", argType::Required, "writer", "writeChunkLength", false, "size_t", "The length in frames of the chunks to write to disk. Should be smaller than circBuffLength.");
333 
334  config.add("writer.threadPrio", "", "writer.threadPrio", argType::Required, "writer", "threadPrio", false, "int", "The real-time priority of the stream writer thread.");
335 
336  config.add("writer.cpuset", "", "writer.cpuset", argType::Required, "writer", "cpuset", false, "int", "The cpuset for the writer thread.");
337 
338  config.add("writer.compress", "", "writer.compress", argType::Required, "writer", "compress", false, "bool", "Flag to set whether compression is used. Default true.");
339 
340  config.add("writer.lz4accel", "", "writer.lz4accel", argType::Required, "writer", "lz4accel", false, "int", "The LZ4 acceleration parameter. Larger is faster, but lower compression.");
341 
342  config.add("writer.outName", "", "writer.outName", argType::Required, "writer", "outName", false, "int", "The name to use for output files. Default is the shmimName.");
343 
344  config.add("framegrabber.shmimName", "", "framegrabber.shmimName", argType::Required, "framegrabber", "shmimName", false, "int", "The name of the stream to monitor. From /tmp/shmimName.im.shm.");
345 
346 
347  config.add("framegrabber.semaphoreNumber", "", "framegrabber.semaphoreNumber", argType::Required, "framegrabber", "semaphoreNumber", false, "int", "The semaphore to wait on. Default is 7.");
348 
349  config.add("framegrabber.semWait", "", "framegrabber.semWait", argType::Required, "framegrabber", "semWait", false, "int", "The time in nsec to wait on the semaphore. Max is 999999999. Default is 5e8 nsec.");
350 
351  config.add("framegrabber.threadPrio", "", "framegrabber.threadPrio", argType::Required, "framegrabber", "threadPrio", false, "int", "The real-time priority of the framegrabber thread.");
352 
353  config.add("framegrabber.cpuset", "", "framegrabber.cpuset", argType::Required, "framegrabber", "cpuset", false, "string", "The cpuset for the framegrabber thread.");
354 
355  telemeterT::setupConfig(config);
356 }
357 
358 
359 
360 inline
362 {
363 
364 
365  config(m_circBuffLength, "writer.circBuffLength");
366  config(m_writeChunkLength, "writer.writeChunkLength");
367  config(m_swThreadPrio, "writer.threadPrio");
368  config(m_swCpuset, "writer.cpuset");
369  config(m_compress, "writer.compress");
370  config(m_lz4accel, "writer.lz4accel");
371  if(m_lz4accel < XRIF_LZ4_ACCEL_MIN) m_lz4accel = XRIF_LZ4_ACCEL_MIN;
372  if(m_lz4accel > XRIF_LZ4_ACCEL_MAX) m_lz4accel = XRIF_LZ4_ACCEL_MAX;
373 
374  config(m_shmimName, "framegrabber.shmimName");
375 
377  config(m_outName, "writer.outName");
378 
379  config(m_semaphoreNumber, "framegrabber.semaphoreNumber");
380  config(m_semWait, "framegrabber.semWait");
381 
382 
383  config(m_fgThreadPrio, "framegrabber.threadPrio");
384  config(m_fgCpuset, "framegrabber.cpuset");
385 
386  //Set some defaults
387  //Setup default log path
389  config(m_rawimageDir, "writer.savePath");
390 
391  if(telemeterT::loadConfig(config) < 0)
392  {
393  log<text_log>("Error during telemeter config", logPrio::LOG_CRITICAL);
394  m_shutdown = true;
395  }
396 }
397 
398 
399 
400 inline
402 {
403  //Create save directory.
404  errno = 0;
405  if( mkdir(m_rawimageDir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH) < 0 )
406  {
407  if( errno != EEXIST)
408  {
409  std::stringstream logss;
410  logss << "Failed to create image directory (" << m_rawimageDir << "). Errno says: " << strerror(errno);
411  log<software_critical>({__FILE__, __LINE__, errno, 0, logss.str()});
412 
413  return -1;
414  }
415 
416  }
417 
418  // set up the INDI properties
421 
422 
423  //Register the stats INDI property
424  REG_INDI_NEWPROP_NOCB(m_indiP_xrifStats, "xrif", pcf::IndiProperty::Number);
425  m_indiP_xrifStats.setLabel("xrif compression performance");
426 
427  indi::addNumberElement<float>(m_indiP_xrifStats, "ratio", 0, 1.0, 0.0, "%0.2f", "Compression Ratio");
428 
429  indi::addNumberElement<float>(m_indiP_xrifStats, "differenceMBsec", 0, std::numeric_limits<float>::max(), 0.0, "%0.2f", "Differencing Rate [MB/sec]");
430 
431  indi::addNumberElement<float>(m_indiP_xrifStats, "reorderMBsec", 0, std::numeric_limits<float>::max(), 0.0, "%0.2f", "Reordering Rate [MB/sec]");
432 
433  indi::addNumberElement<float>(m_indiP_xrifStats, "compressMBsec", 0, std::numeric_limits<float>::max(), 0.0, "%0.2f", "Compression Rate [MB/sec]");
434 
435  indi::addNumberElement<float>(m_indiP_xrifStats, "encodeMBsec", 0, std::numeric_limits<float>::max(), 0.0, "%0.2f", "Total Encoding Rate [MB/sec]");
436 
437  indi::addNumberElement<float>(m_indiP_xrifStats, "differenceFPS", 0, std::numeric_limits<float>::max(), 0.0, "%0.2f", "Differencing Rate [f.p.s.]");
438 
439  indi::addNumberElement<float>(m_indiP_xrifStats, "reorderFPS", 0, std::numeric_limits<float>::max(), 0.0, "%0.2f", "Reordering Rate [f.p.s.]");
440 
441  indi::addNumberElement<float>(m_indiP_xrifStats, "compressFPS", 0, std::numeric_limits<float>::max(), 0.0, "%0.2f", "Compression Rate [f.p.s.]");
442 
443  indi::addNumberElement<float>(m_indiP_xrifStats, "encodeFPS", 0, std::numeric_limits<float>::max(), 0.0, "%0.2f", "Total Encoding Rate [f.p.s.]");
444 
445 
446  //Now set up the framegrabber and writer threads.
447  // - need SIGSEGV and SIGBUS handling for ImageStreamIO restarts
448  // - initialize the semaphore
449  // - start the threads
450 
451  if(setSigSegvHandler() < 0) return log<software_error, -1>({__FILE__, __LINE__});
452 
453  if(sem_init(&m_swSemaphore, 0,0) < 0) return log<software_critical, -1>({__FILE__, __LINE__, errno,0, "Initializing S.W. semaphore"});
454 
455  //Check if we have a safe writeChunkLengthh
457  {
458  return log<software_critical, -1>({__FILE__,__LINE__, "Write chunk length is not a divisor of circular buffer length."});
459  }
460 
461 
462  if(initialize_xrif() < 0) log<software_critical,-1>({__FILE__, __LINE__});
463 
464 
466  {
467  return log<software_critical,-1>({__FILE__, __LINE__});
468  }
469 
471  {
472  log<software_critical,-1>({__FILE__, __LINE__});
473  }
474 
475  if(telemeterT::appStartup() < 0)
476  {
477  return log<software_error,-1>({__FILE__,__LINE__});
478  }
479 
480  return 0;
481 
482 }
483 
484 inline
486 {
487 
488  //first do a join check to see if other threads have exited.
489  //these will throw if the threads are really gone
490  try
491  {
492  if(pthread_tryjoin_np(m_fgThread.native_handle(),0) == 0)
493  {
494  log<software_error>({__FILE__, __LINE__, "framegrabber thread has exited"});
495  return -1;
496  }
497  }
498  catch(...)
499  {
500  log<software_error>({__FILE__, __LINE__, "streamwriter thread has exited"});
501  return -1;
502  }
503 
504  try
505  {
506  if(pthread_tryjoin_np(m_swThread.native_handle(),0) == 0)
507  {
508  log<software_error>({__FILE__, __LINE__, "stream thread has exited"});
509  return -1;
510  }
511  }
512  catch(...)
513  {
514  log<software_error>({__FILE__, __LINE__, "streamwriter thread has exited"});
515  return -1;
516  }
517 
518  switch(m_writing)
519  {
520  case NOT_WRITING:
522  break;
523  default:
525  }
526 
528  {
529  if(telemeterT::appLogic() < 0)
530  {
531  log<software_error>({__FILE__, __LINE__});
532  return 0;
533  }
534  }
535 
536  updateINDI();
537 
538  return 0;
539 
540 }
541 
542 inline
544 {
545  try
546  {
547  if(m_fgThread.joinable())
548  {
549  m_fgThread.join();
550  }
551  }
552  catch(...){}
553 
554  try
555  {
556  if(m_swThread.joinable())
557  {
558  m_swThread.join();
559  }
560  }
561  catch(...){}
562 
563  if(m_xrif)
564  {
565  xrif_delete(m_xrif);
566  m_xrif=nullptr;
567  }
568 
569  if(m_xrif_timing)
570  {
571  xrif_delete(m_xrif_timing);
572  m_xrif_timing=nullptr;
573  }
574 
576 
577  return 0;
578 }
579 inline
581 {
582  xrif_error_t rv = xrif_new(&m_xrif);
583  if( rv != XRIF_NOERROR )
584  {
585  return log<software_critical, -1>({__FILE__,__LINE__, 0, rv, "xrif handle allocation or initialization error."});
586  }
587 
588  if(m_compress)
589  {
590  rv = xrif_configure(m_xrif, XRIF_DIFFERENCE_PREVIOUS, XRIF_REORDER_BYTEPACK, XRIF_COMPRESS_LZ4);
591  if( rv != XRIF_NOERROR )
592  {
593  return log<software_critical, -1>({__FILE__,__LINE__, 0, rv, "xrif handle configuration error."});
594  }
595  }
596  else
597  {
598  std::cerr << "not compressing . . . \n";
599  rv = xrif_configure(m_xrif, XRIF_DIFFERENCE_NONE, XRIF_REORDER_NONE, XRIF_COMPRESS_NONE);
600  if( rv != XRIF_NOERROR )
601  {
602  return log<software_critical, -1>({__FILE__,__LINE__, 0, rv, "xrif handle configuration error."});
603  }
604 
605  }
606 
607  errno = 0;
608  m_xrif_header = (char *) malloc( XRIF_HEADER_SIZE * sizeof(char));
609  if(m_xrif_header == NULL)
610  {
611  return log<software_critical, -1>({__FILE__,__LINE__, errno, 0, "xrif header allocation failed."});
612  }
613 
614  rv = xrif_new(&m_xrif_timing);
615  if( rv != XRIF_NOERROR )
616  {
617  return log<software_critical, -1>({__FILE__,__LINE__, 0, rv, "xrif handle allocation or initialization error."});
618  }
619 
620  //m_xrif_timing->reorder_method = XRIF_REORDER_NONE;
621  rv = xrif_configure(m_xrif_timing, XRIF_DIFFERENCE_NONE, XRIF_REORDER_NONE, XRIF_COMPRESS_NONE);
622  if( rv != XRIF_NOERROR )
623  {
624  return log<software_critical, -1>({__FILE__,__LINE__, 0, rv, "xrif handle configuration error."});
625  }
626 
627  errno = 0;
628  m_xrif_timing_header = (char *) malloc( XRIF_HEADER_SIZE * sizeof(char));
629  if(m_xrif_timing_header == NULL)
630  {
631  return log<software_critical, -1>({__FILE__,__LINE__, errno, 0, "xrif header allocation failed."});
632  }
633 
634  return 0;
635 }
636 
637 inline
639 {
640  struct sigaction act;
641  sigset_t set;
642 
643  act.sa_sigaction = &streamWriter::_handlerSigSegv;
644  act.sa_flags = SA_SIGINFO;
645  sigemptyset(&set);
646  act.sa_mask = set;
647 
648  errno = 0;
649  if( sigaction(SIGSEGV, &act, 0) < 0 )
650  {
651  std::string logss = "Setting handler for SIGSEGV failed. Errno says: ";
652  logss += strerror(errno);
653 
654  log<software_error>({__FILE__, __LINE__, errno, 0, logss});
655 
656  return -1;
657  }
658 
659  errno = 0;
660  if( sigaction(SIGBUS, &act, 0) < 0 )
661  {
662  std::string logss = "Setting handler for SIGBUS failed. Errno says: ";
663  logss += strerror(errno);
664 
665  log<software_error>({__FILE__, __LINE__, errno, 0,logss});
666 
667  return -1;
668  }
669 
670  log<text_log>("Installed SIGSEGV/SIGBUS signal handler.", logPrio::LOG_DEBUG);
671 
672  return 0;
673 }
674 
675 inline
677  siginfo_t *siginf,
678  void *ucont
679  )
680 {
681  m_selfWriter->handlerSigSegv(signum, siginf, ucont);
682 }
683 
684 inline
686  siginfo_t *siginf,
687  void *ucont
688  )
689 {
690  static_cast<void>(signum);
691  static_cast<void>(siginf);
692  static_cast<void>(ucont);
693 
694  m_restart = true;
695 
696  return;
697 }
698 
699 inline
701 {
703  {
704  free(m_rawImageCircBuff);
705  }
706 
707  errno = 0;
709 
710  if(m_rawImageCircBuff == NULL)
711  {
712  return log<software_critical,-1>({__FILE__,__LINE__, errno, 0, "buffer allocation failure"});
713  }
714 
715  if(m_timingCircBuff)
716  {
717  free(m_timingCircBuff);
718  }
719 
720  errno = 0;
721  m_timingCircBuff = (uint64_t *) malloc( 5*sizeof(uint64_t)*m_circBuffLength);
722  if(m_timingCircBuff == NULL)
723  {
724  return log<software_critical,-1>({__FILE__,__LINE__, errno, 0, "buffer allocation failure"});
725  }
726 
727  return 0;
728 }
729 
730 inline
732 {
733  //Set up the image data xrif handle
734  xrif_error_t rv;
735 
736  if(m_compress)
737  {
738  rv = xrif_configure(m_xrif, XRIF_DIFFERENCE_PREVIOUS, XRIF_REORDER_BYTEPACK, XRIF_COMPRESS_LZ4);
739  if( rv != XRIF_NOERROR )
740  {
741  return log<software_critical,-1>({__FILE__,__LINE__, 0, rv, "xrif handle configuration error."});
742  }
743  }
744  else
745  {
746  std::cerr << "not compressing . . . \n";
747  rv = xrif_configure(m_xrif, XRIF_DIFFERENCE_NONE, XRIF_REORDER_NONE, XRIF_COMPRESS_NONE);
748  if( rv != XRIF_NOERROR )
749  {
750  return log<software_critical,-1>({__FILE__,__LINE__, 0, rv, "xrif handle configuration error."});
751  }
752  }
753 
754  rv = xrif_set_size(m_xrif, m_width, m_height, 1, m_writeChunkLength, m_dataType);
755  if( rv != XRIF_NOERROR )
756  {
757  return log<software_critical,-1>({__FILE__,__LINE__, 0, rv, "xrif_set_size error."});
758  }
759 
760  rv = xrif_allocate_raw(m_xrif);
761  if( rv != XRIF_NOERROR )
762  {
763  return log<software_critical,-1>({__FILE__,__LINE__, 0, rv, "xrif_allocate_raw error."});
764  }
765 
766  rv = xrif_allocate_reordered(m_xrif);
767  if( rv != XRIF_NOERROR )
768  {
769  return log<software_critical,-1>({__FILE__,__LINE__, 0, rv, "xrif_allocate_reordered error."});
770  }
771 
772  //Set up the timing data xrif handle
773  rv = xrif_configure(m_xrif_timing, XRIF_DIFFERENCE_NONE, XRIF_REORDER_NONE, XRIF_COMPRESS_NONE);
774  if( rv != XRIF_NOERROR )
775  {
776  return log<software_critical,-1>({__FILE__,__LINE__, 0, rv, "xrif handle configuration error."});
777  }
778 
779  rv = xrif_set_size(m_xrif_timing, 5,1,1, m_writeChunkLength, XRIF_TYPECODE_UINT64);
780  if( rv != XRIF_NOERROR )
781  {
782  return log<software_critical,-1>({__FILE__,__LINE__, 0, rv, "xrif_set_size error."});
783  }
784 
785  rv = xrif_allocate_raw(m_xrif_timing);
786  if( rv != XRIF_NOERROR )
787  {
788  return log<software_critical,-1>({__FILE__,__LINE__, 0, rv, "xrif_allocate_raw error."});
789  }
790 
791  rv = xrif_allocate_reordered(m_xrif_timing);
792  if( rv != XRIF_NOERROR )
793  {
794  return log<software_critical,-1>({__FILE__,__LINE__, 0, rv, "xrif_allocate_reordered error."});
795  }
796 
797  return 0;
798 }
799 
800 
801 inline
803 {
804  o->fgThreadExec();
805 }
806 
807 inline
809 {
810  m_fgThreadID = syscall(SYS_gettid);
811 
812  //Wait fpr the thread starter to finish initializing this thread.
813  while(m_fgThreadInit == true && m_shutdown == 0)
814  {
815  sleep(1);
816  }
817 
818  timespec missing_ts;
819 
820  IMAGE image;
821  ino_t inode =0; // The inode of the image stream file
822 
823  bool opened = false;
824 
825  while(m_shutdown == 0)
826  {
827  /* Initialize ImageStreamIO
828  */
829  opened = false;
830  m_restart = false; //Set this up front, since we're about to restart.
831 
832  sem_t * sem {nullptr}; ///< The semaphore to monitor for new image data
833 
834  int logged = 0;
835  while(!opened && !m_shutdown && !m_restart)
836  {
837  //b/c ImageStreamIO prints every single time, and latest version don't support stopping it yet, and that isn't thread-safe-able anyway
838  //we do our own checks. This is the same code in ImageStreamIO_openIm...
839  int SM_fd;
840  char SM_fname[200];
841  ImageStreamIO_filename(SM_fname, sizeof(SM_fname), m_shmimName.c_str());
842  SM_fd = open(SM_fname, O_RDWR);
843  if(SM_fd == -1)
844  {
845  if(!logged) log<text_log>("ImageStream " + m_shmimName + " not found (yet). Retrying . . .", logPrio::LOG_NOTICE);
846  logged = 1;
847  sleep(1); //be patient
848  continue;
849  }
850 
851  //Found and opened, close it and then use ImageStreamIO
852  logged = 0;
853  close(SM_fd);
854 
855  if( ImageStreamIO_openIm(&image, m_shmimName.c_str()) == 0)
856  {
857  if(image.md[0].sem <= m_semaphoreNumber) ///<\todo this isn't right--> isn't there a define in cacao to use?
858  {
859  ImageStreamIO_closeIm(&image);
860  mx::sys::sleep(1); //We just need to wait for the server process to finish startup.
861  }
862  else
863  {
864  opened = true;
865 
866  char SM_fname[200];
867  ImageStreamIO_filename(SM_fname, sizeof(SM_fname), m_shmimName.c_str());
868 
869  struct stat buffer;
870  int rv = stat(SM_fname, &buffer);
871 
872  if(rv != 0)
873  {
874  log<software_critical>({__FILE__,__LINE__, errno, "Could not get inode for " + m_shmimName + ". Source process will need to be restarted."});
875  ImageStreamIO_closeIm(&image);
876  return;
877  }
878  inode = buffer.st_ino;
879  }
880  }
881  else
882  {
883  mx::sys::sleep(1); //be patient
884  }
885  }
886 
887  if(m_restart) continue; //this is kinda dumb. we just go around on restart, so why test in the while loop at all?
888 
889  if(m_shutdown || !opened)
890  {
891  if(!opened) return;
892 
893  ImageStreamIO_closeIm(&image);
894  return;
895  }
896 
897  //now get a good semaphore
898  m_semaphoreNumber = ImageStreamIO_getsemwaitindex(&image, m_semaphoreNumber); //ask for semaphore we had before
899 
900  if(m_semaphoreNumber < 0)
901  {
902  log<software_critical>({__FILE__,__LINE__, "No valid semaphore found for " + m_shmimName + ". Source process will need to be restarted."});
903  return;
904  }
905 
906  log<software_info>({__FILE__,__LINE__, "got semaphore index " + std::to_string(m_semaphoreNumber) + " for " + m_shmimName });
907 
908  ImageStreamIO_semflush(&image, m_semaphoreNumber);
909 
910  sem = image.semptr[m_semaphoreNumber]; ///< The semaphore to monitor for new image data
911 
912  m_dataType = image.md[0].datatype;
913  m_typeSize = ImageStreamIO_typesize(m_dataType);
914  m_width = image.md[0].size[0];
915  m_height = image.md[0].size[1];
916  size_t length;
917  if(image.md[0].naxis == 3)
918  {
919  length = image.md[0].size[2];
920  }
921  else
922  {
923  length = 1;
924  }
925  std::cerr << "connected" << " " << m_width << "x" << m_height << "x" << (int) m_dataType << " (" << m_typeSize << ")" << std::endl;
926 
927  //Now allocate the circBuffs
928  if(allocate_circbufs() < 0) return; //will cause shutdown!
929 
930  // And allocate the xrifs
931  if(allocate_xrif() < 0) return; //Will cause shutdown!
932 
933  uint8_t atype;
934  size_t snx, sny, snz;
935 
936 
937  uint64_t curr_image; //The current cnt1 index
938  m_currImage = 0;
939  m_currChunkStart = 0;
940  m_nextChunkStart = 0;
941 
942  uint64_t last_cnt0 = ((uint64_t) -1);
943 
944  int cnt0flag = 0;
945 
946  //This is the main image grabbing loop.
947  while(!m_shutdown && !m_restart)
948  {
949  timespec ts;
950 
951  if(clock_gettime(CLOCK_REALTIME, &ts) < 0)
952  {
953  log<software_critical>({__FILE__,__LINE__,errno,0,"clock_gettime"});
954  return;
955  }
956 
957  mx::sys::timespecAddNsec(ts, m_semWait);
958 
959  if(sem_timedwait(sem, &ts) == 0)
960  {
961  if(image.md[0].naxis > 2) ///\todo change to naxis?
962  {
963  curr_image = image.md[0].cnt1;
964  }
965  else curr_image = 0;
966 
967  atype = image.md[0].datatype;
968  snx = image.md[0].size[0];
969  sny = image.md[0].size[1];
970  if(image.md[0].naxis == 3)
971  {
972  snz = image.md[0].size[2];
973  }
974  else
975  {
976  snz = 1;
977  }
978 
979  if( atype!= m_dataType || snx != m_width || sny != m_height || snz != length )
980  {
981  //**** close out here
982 
983  break; //exit the nearest while loop and get the new image setup.
984  }
985 
986  if(m_shutdown || m_restart) break; //Check for exit signals
987 
988  uint64_t new_cnt0;
989  if(image.cntarray)
990  {
991  new_cnt0 = image.cntarray[curr_image];
992  }
993  else
994  {
995  new_cnt0 = image.md[0].cnt0;
996  }
997 
998  if(new_cnt0 == last_cnt0 )
999  {
1000  log<text_log>("semaphore raised but cnt0 has not changed -- we're probably getting behind", logPrio::LOG_WARNING);
1001  ++cnt0flag;
1002  if(cnt0flag > 10) m_restart = true; //if we get here 10 times then something else is wrong.
1003  continue;
1004  }
1005 
1006  if(new_cnt0 - last_cnt0 > 1)
1007  {
1008  log<text_log>("cnt0 changed by more than 1. Frame skipped.", logPrio::LOG_WARNING);
1009  }
1010 
1011  cnt0flag = 0;
1012 
1013  last_cnt0 = new_cnt0;
1014 
1015 
1017  char * curr_src = (char *) image.array.raw + curr_image*m_width*m_height*m_typeSize;
1018 
1019  memcpy( curr_dest, curr_src , m_width*m_height*m_typeSize);
1020 
1021  uint64_t * curr_timing = m_timingCircBuff + 5*m_currImage;
1022 
1023  if(image.cntarray)
1024  {
1025  curr_timing[0] = image.cntarray[curr_image];
1026  curr_timing[1] = image.atimearray[curr_image].tv_sec;
1027  curr_timing[2] = image.atimearray[curr_image].tv_nsec;
1028  curr_timing[3] = image.writetimearray[curr_image].tv_sec;
1029  curr_timing[4] = image.writetimearray[curr_image].tv_nsec;
1030  }
1031  else
1032  {
1033  curr_timing[0] = image.md[0].cnt0;
1034  curr_timing[1] = image.md[0].atime.tv_sec;
1035  curr_timing[2] = image.md[0].atime.tv_nsec;
1036  curr_timing[3] = image.md[0].writetime.tv_sec;
1037  curr_timing[4] = image.md[0].writetime.tv_nsec;
1038  }
1039 
1040 
1041  //Check if we need to time-stamp ourselves -- for old cacao streams
1042  if(curr_timing[1] == 0)
1043  {
1044 
1045  if(clock_gettime(CLOCK_REALTIME, &missing_ts) < 0)
1046  {
1047  log<software_critical>({__FILE__,__LINE__,errno,0,"clock_gettime"});
1048  return;
1049  }
1050 
1051  curr_timing[1] = missing_ts.tv_sec;
1052  curr_timing[2] = missing_ts.tv_nsec;
1053  }
1054 
1055  //just set w-time to a-time if it's missing
1056  if(curr_timing[3] == 0)
1057  {
1058  curr_timing[3] = curr_timing[1];
1059  curr_timing[4] = curr_timing[2];
1060  }
1061 
1063  switch(m_writing)
1064  {
1065  case START_WRITING:
1068  m_writing = WRITING;
1069  m_currSaveStartFrameNo = new_cnt0;
1070  m_logSaveStart = true;
1071  // fall through
1072  case WRITING:
1074  {
1077  m_currSaveStopFrameNo = new_cnt0;
1078 
1079  //Now tell the writer to get going
1080  if(sem_post(&m_swSemaphore) < 0)
1081  {
1082  log<software_critical>({__FILE__, __LINE__, errno, 0, "Error posting to semaphore"});
1083  return;
1084  }
1085 
1088 
1090 
1091  }
1092  break;
1093 
1094  case STOP_WRITING:
1097  m_currSaveStopFrameNo = new_cnt0;
1098 
1099  //Now tell the writer to get going
1100  if(sem_post(&m_swSemaphore) < 0)
1101  {
1102  log<software_critical>({__FILE__, __LINE__, errno, 0, "Error posting to semaphore"});
1103  return;
1104  }
1105  break;
1106 
1107  default:
1108  break;
1109  }
1110 
1111  ++m_currImage;
1113 
1114 
1115  }
1116  else
1117  {
1118  //***** IF here, should check if "STOP_WRITING" set, and trigger the cleanup and close out of the current writing
1119 
1120  if(image.md[0].sem <= 0) break; //Indicates that the server has cleaned up.
1121 
1122  //Check for why we timed out
1123  if(errno == EINTR) break; //This will indicate time to shutdown, loop will exit normally flags set.
1124 
1125  //ETIMEDOUT just means we should wait more.
1126  //Otherwise, report an error.
1127  if(errno != ETIMEDOUT)
1128  {
1129  log<software_error>({__FILE__, __LINE__,errno, "sem_timedwait"});
1130  break;
1131  }
1132 
1133  //Check if the file has disappeared.
1134  int SM_fd;
1135  char SM_fname[200];
1136  ImageStreamIO_filename(SM_fname, sizeof(SM_fname), m_shmimName.c_str());
1137  SM_fd = open(SM_fname, O_RDWR);
1138  if(SM_fd == -1)
1139  {
1140  m_restart = true;
1141  }
1142  close(SM_fd);
1143 
1144  //Check if the inode changed
1145  struct stat buffer;
1146  int rv = stat(SM_fname, &buffer);
1147  if(rv != 0)
1148  {
1149  m_restart = true;
1150  }
1151 
1152  if(buffer.st_ino != inode)
1153  {
1154  m_restart = true;
1155  }
1156  }
1157  }
1158 
1159  ///\todo might still be writing here, so must check
1160  if(m_rawImageCircBuff)
1161  {
1162  free(m_rawImageCircBuff);
1163  m_rawImageCircBuff = 0;
1164  }
1165 
1166  if(m_timingCircBuff)
1167  {
1168  free(m_timingCircBuff);
1169  m_timingCircBuff = 0;
1170  }
1171 
1172  if(opened)
1173  {
1174  if(m_semaphoreNumber >= 0) image.semReadPID[m_semaphoreNumber] = 0; //release semaphore
1175  ImageStreamIO_closeIm(&image);
1176  opened = false;
1177  }
1178 
1179  } //outer loop, will exit if m_shutdown==true
1180 
1181  ///\todo might still be writing here, so must check
1182  //One more check
1183  if(m_rawImageCircBuff)
1184  {
1185  free(m_rawImageCircBuff);
1186  m_rawImageCircBuff = 0;
1187  }
1188 
1189  if(m_timingCircBuff)
1190  {
1191  free(m_timingCircBuff);
1192  m_timingCircBuff = 0;
1193  }
1194 
1195  if(opened)
1196  {
1197  if(m_semaphoreNumber >= 0) image.semReadPID[m_semaphoreNumber] = 0; //release semaphore
1198  ImageStreamIO_closeIm(&image);
1199  }
1200 }
1201 
1202 
1203 inline
1205 {
1206  s->swThreadExec();
1207 }
1208 
1209 inline
1211 {
1212  m_swThreadID = syscall(SYS_gettid);
1213 
1214  //Wait fpr the thread starter to finish initializing this thread.
1215  while(m_swThreadInit == true && m_shutdown == 0)
1216  {
1217  sleep(1);
1218  }
1219 
1220 
1221  while(!m_shutdown)
1222  {
1223  while(!shutdown() && (!( state() == stateCodes::READY || state() == stateCodes::OPERATING) ) )
1224  {
1225  if(m_fname)
1226  {
1227  free(m_fname);
1228  m_fname = nullptr;
1229  }
1230  sleep(1);
1231  }
1232 
1233  if(shutdown()) break;
1234 
1235  //This will happen after a reconnection, and could update m_shmimName, etc.
1236  if(m_fname == nullptr)
1237  {
1238  m_fnameBase = m_rawimageDir + "/" + m_outName + "_";
1239 
1240  m_fnameSz = m_fnameBase.size() + sizeof("YYYYMMDDHHMMSSNNNNNNNNN.xrif"); //the sizeof includes the \0
1241  m_fname = (char*) malloc(m_fnameSz);
1242 
1243  snprintf(m_fname, m_fnameSz, "%sYYYYMMDDHHMMSSNNNNNNNNN.xrif", m_fnameBase.c_str());
1244 
1245  }
1246 
1247  //at this point fname is not null.
1248 
1249  timespec ts;
1250 
1251  if(clock_gettime(CLOCK_REALTIME, &ts) < 0)
1252  {
1253  log<software_critical>({__FILE__,__LINE__,errno,0,"clock_gettime"});
1254 
1255  free(m_fname);
1256  m_fname = nullptr;
1257 
1258  return; //will trigger a shutdown
1259  }
1260 
1261  mx::sys::timespecAddNsec(ts, m_semWait);
1262 
1263  if(sem_timedwait(&m_swSemaphore, &ts) == 0)
1264  {
1265  if(doEncode() < 0) return;
1266  //Otherwise, success, and we just go on.
1267  }
1268  else
1269  {
1270  //Check for why we timed out
1271  if(errno == EINTR) continue; //This will probably indicate time to shutdown, loop will exit normally if flags set.
1272 
1273  //ETIMEDOUT just means we should wait more.
1274  //Otherwise, report an error.
1275  if(errno != ETIMEDOUT)
1276  {
1277  log<software_error>({__FILE__, __LINE__,errno, "sem_timedwait"});
1278  break;
1279  }
1280  }
1281  } //outer loop, will exit if m_shutdown==true
1282 
1283 
1284  if(m_fname)
1285  {
1286  free(m_fname);
1287  m_fname = nullptr;
1288  }
1289 }
1290 
1291 inline
1293 {
1294  if(m_writing == NOT_WRITING) return 0;
1295 
1296  if(m_logSaveStart)
1297  {
1298  log<saving_start>({1,m_currSaveStartFrameNo});
1299  m_logSaveStart = false;
1300  }
1301 
1302  recordSavingState(true);
1303 
1304  timespec tw0, tw1, tw2;
1305 
1306  clock_gettime(CLOCK_REALTIME, &tw0);
1307 
1308  //Configure xrif and copy image data -- this does no allocations
1309  int rv = xrif_set_size(m_xrif, m_width, m_height, 1, (m_currSaveStop-m_currSaveStart), m_dataType);
1310  if(rv != XRIF_NOERROR)
1311  {
1312  //This is a big problem. Report it as "ALERT" and go on.
1313  log<software_alert>({__FILE__,__LINE__, 0, rv, "xrif set size error. DATA POSSIBLY LOST"});
1314  }
1315 
1316  rv = xrif_set_lz4_acceleration(m_xrif, m_lz4accel);
1317  if(rv != XRIF_NOERROR)
1318  {
1319  //This may just be out of range, it's only an error.
1320  log<software_error>({__FILE__,__LINE__, 0, rv, "xrif set LZ4 acceleration error."});
1321  }
1322 
1324 
1325  //Configure xrif and copy timing data -- no allocations
1326  rv = xrif_set_size(m_xrif_timing, 5, 1, 1, (m_currSaveStop-m_currSaveStart), XRIF_TYPECODE_UINT64);
1327  if(rv != XRIF_NOERROR)
1328  {
1329  //This is a big problem. Report it as "ALERT" and go on.
1330  log<software_alert>({__FILE__,__LINE__, 0, rv, "xrif set size error. DATA POSSIBLY LOST."});
1331  }
1332 
1333  rv = xrif_set_lz4_acceleration(m_xrif_timing, m_lz4accel);
1334  if(rv != XRIF_NOERROR)
1335  {
1336  //This may just be out of range, it's only an error.
1337  log<software_error>({__FILE__,__LINE__, 0, rv, "xrif set LZ4 acceleration error."});
1338  }
1339 
1340  memcpy(m_xrif_timing->raw_buffer, m_timingCircBuff + m_currSaveStart*5, (m_currSaveStop-m_currSaveStart)*5*sizeof(uint64_t));
1341 
1342 
1343  rv = xrif_encode(m_xrif);
1344  if(rv != XRIF_NOERROR)
1345  {
1346  //This is a big problem. Report it as "ALERT" and go on.
1347  log<software_alert>({__FILE__,__LINE__, 0, rv, "xrif encode error. DATA POSSIBLY LOST."});
1348  }
1349 
1350  rv = xrif_write_header( m_xrif_header, m_xrif);
1351  if(rv != XRIF_NOERROR)
1352  {
1353  //This is a big problem. Report it as "ALERT" and go on.
1354  log<software_alert>({__FILE__,__LINE__, 0, rv, "xrif write header error. DATA POSSIBLY LOST."});
1355  }
1356 
1357  rv = xrif_encode(m_xrif_timing);
1358  if(rv != XRIF_NOERROR)
1359  {
1360  //This is a big problem. Report it as "ALERT" and go on.
1361  log<software_alert>({__FILE__,__LINE__, 0, rv, "xrif encode error. DATA POSSIBLY LOST."});
1362  }
1363 
1364  rv = xrif_write_header( m_xrif_timing_header, m_xrif_timing);
1365  if(rv != XRIF_NOERROR)
1366  {
1367  //This is a big problem. Report it as "ALERT" and go on.
1368  log<software_alert>({__FILE__,__LINE__, 0, rv, "xrif write header error. DATA POSSIBLY LOST"});
1369  }
1370 
1371  //Now break down the acq time of the first image in the buffer for use in file name
1372  tm uttime;//The broken down time.
1373  timespec * fts = (timespec *) (m_timingCircBuff + m_currSaveStart*5 +1);
1374 
1375  if(gmtime_r(&fts->tv_sec, &uttime) == 0)
1376  {
1377  //Yell at operator but keep going
1378  log<software_alert>({__FILE__,__LINE__,errno,0,"gmtime_r error. possible loss of timing information."});
1379  }
1380 
1381  //Available size = m_fnameSz-m_fnameBase.size(), rather than assuming sizeof("YYYYMMDDHHMMSSNNNNNNNNN"), in case we screwed up somewhere.
1382  rv = snprintf(m_fname + m_fnameBase.size(), m_fnameSz-m_fnameBase.size(), "%04i%02i%02i%02i%02i%02i%09i", uttime.tm_year+1900,
1383  uttime.tm_mon+1, uttime.tm_mday, uttime.tm_hour, uttime.tm_min, uttime.tm_sec, static_cast<int>(fts->tv_nsec));
1384 
1385  if(rv != sizeof("YYYYMMDDHHMMSSNNNNNNNNN")-1)
1386  {
1387  //Something is very wrong. Keep going to try to get it on disk.
1388  log<software_alert>({__FILE__,__LINE__, errno, rv, "did not write enough chars to timestamp"});
1389  }
1390 
1391  //Cover up the \0 inserted by snprintf
1392  (m_fname + m_fnameBase.size())[23] = '.';
1393 
1394  clock_gettime(CLOCK_REALTIME, &tw1);
1395 
1396  FILE * fp_xrif = fopen(m_fname, "wb");
1397  if(fp_xrif == NULL)
1398  {
1399  //This is it. If we can't write data to disk need to fix.
1400  log<software_alert>({__FILE__,__LINE__,errno,0,"failed to open file for writing"});
1401 
1402  free(m_fname);
1403  m_fname = nullptr;
1404 
1405  return -1; //will trigger a shutdown
1406  }
1407 
1408  size_t bw = fwrite(m_xrif_header, sizeof(uint8_t), XRIF_HEADER_SIZE, fp_xrif);
1409 
1410  if(bw != XRIF_HEADER_SIZE)
1411  {
1412  log<software_alert>({__FILE__,__LINE__,errno, 0,"failure writing header to file. DATA LOSS LIKELY. bytes = " + std::to_string(bw)});
1413  //We go on . . .
1414  }
1415 
1416  bw = fwrite(m_xrif->raw_buffer, sizeof(uint8_t), m_xrif->compressed_size, fp_xrif);
1417 
1418  if(bw != m_xrif->compressed_size)
1419  {
1420  log<software_alert>({__FILE__,__LINE__,errno,0,"failure writing data to file. DATA LOSS LIKELY. bytes = " + std::to_string(bw)});
1421  }
1422 
1423  bw = fwrite(m_xrif_timing_header, sizeof(uint8_t), XRIF_HEADER_SIZE, fp_xrif);
1424 
1425  if(bw != XRIF_HEADER_SIZE)
1426  {
1427  log<software_alert>({__FILE__,__LINE__,errno, 0,"failure writing timing header to file. DATA LOSS LIKELY. bytes = " + std::to_string(bw)});
1428  }
1429 
1430  bw = fwrite(m_xrif_timing->raw_buffer, sizeof(uint8_t), m_xrif_timing->compressed_size, fp_xrif);
1431 
1432  if(bw != m_xrif_timing->compressed_size)
1433  {
1434  log<software_alert>({__FILE__,__LINE__,errno,0,"failure writing timing data to file. DATA LOSS LIKELY. bytes = " + std::to_string(bw)});
1435  }
1436 
1437  fclose(fp_xrif);
1438 
1439  clock_gettime(CLOCK_REALTIME, &tw2);
1440 
1441  double wt = ( (double) tw2.tv_sec + ((double) tw2.tv_nsec)/1e9) - ( (double) tw1.tv_sec + ((double) tw1.tv_nsec)/1e9);
1442 
1443  std::cerr << wt << "\n";
1444 
1445  recordSavingStats(true);
1446 
1447  if(m_writing == STOP_WRITING)
1448  {
1450  log<saving_stop>({0,m_currSaveStopFrameNo});
1451  }
1452 
1453  recordSavingState(true);
1454 
1455  return 0;
1456 }
1457 
1458 INDI_NEWCALLBACK_DEFN(streamWriter, m_indiP_writing)(const pcf::IndiProperty &ipRecv)
1459 {
1460  INDI_VALIDATE_CALLBACK_PROPS( m_indiP_writing, ipRecv);
1461 
1462  if(!ipRecv.find("toggle")) return 0;
1463 
1464  if( ipRecv["toggle"].getSwitchState() == pcf::IndiElement::Off && (m_writing == WRITING || m_writing == START_WRITING))
1465  {
1466  m_writing = STOP_WRITING;
1467  }
1468 
1469  if( ipRecv["toggle"].getSwitchState() == pcf::IndiElement::On && m_writing == NOT_WRITING)
1470  {
1471  m_writing = START_WRITING;
1472  }
1473  return 0;
1474 }
1475 
1476 inline
1478 {
1479  //Only update this if not changing
1480  if(m_writing == NOT_WRITING || m_writing == WRITING)
1481  {
1482  if(m_xrif && m_writing == WRITING)
1483  {
1484  indi::updateSwitchIfChanged(m_indiP_writing, "toggle", pcf::IndiElement::On, m_indiDriver, INDI_OK);
1485  indi::updateIfChanged(m_indiP_xrifStats, "ratio", m_xrif->compression_ratio, m_indiDriver, INDI_BUSY);
1486  indi::updateIfChanged(m_indiP_xrifStats, "encodeMBsec", m_xrif->encode_rate/1048576.0, m_indiDriver, INDI_BUSY);
1488  indi::updateIfChanged(m_indiP_xrifStats, "differenceMBsec", m_xrif->difference_rate/1048576.0, m_indiDriver, INDI_BUSY);
1490  indi::updateIfChanged(m_indiP_xrifStats, "reorderMBsec", m_xrif->reorder_rate/1048576.0, m_indiDriver, INDI_BUSY);
1492  indi::updateIfChanged(m_indiP_xrifStats, "compressMBsec", m_xrif->compress_rate/1048576.0, m_indiDriver, INDI_BUSY);
1494  }
1495  else
1496  {
1497  indi::updateSwitchIfChanged(m_indiP_writing, "toggle", pcf::IndiElement::Off, m_indiDriver, INDI_OK);
1507  }
1508  }
1509 }
1510 
1511 inline
1513 {
1515 }
1516 
1517 inline
1519 {
1520  return recordSavingState(true);
1521 }
1522 
1523 inline
1525 {
1526  static int16_t lastState = -1;
1527  static uint64_t currSaveStart = -1;
1528 
1529  int16_t state;
1530  if(m_writing == WRITING) state = 1;
1531  else state = 0;
1532 
1533  if(state != lastState || m_currSaveStart != currSaveStart || force)
1534  {
1535  telem<telem_saving_state>({state, m_currSaveStart});
1536 
1537  lastState = state;
1538  currSaveStart = m_currSaveStart;
1539  }
1540 
1541  return 0;
1542 }
1543 
1544 inline
1546 {
1547  static uint32_t last_rawSize = -1;
1548  static uint32_t last_compressedSize = -1;
1549  static float last_encodeRate = -1;
1550  static float last_differenceRate = -1;
1551  static float last_reorderRate = -1;
1552  static float last_compressRate = -1;
1553 
1554  if(m_xrif->raw_size != last_rawSize || m_xrif->compressed_size != last_compressedSize || m_xrif->encode_rate != last_encodeRate || m_xrif->difference_rate != last_differenceRate ||
1555  m_xrif->reorder_rate != last_reorderRate || m_xrif->compress_rate != last_compressRate || force)
1556  {
1557  telem<telem_saving>({(uint32_t) m_xrif->raw_size, (uint32_t) m_xrif->compressed_size, (float) m_xrif->encode_rate, (float) m_xrif->difference_rate, (float) m_xrif->reorder_rate, (float) m_xrif->compress_rate});
1558 
1559  last_rawSize = m_xrif->raw_size;
1560  last_compressedSize = m_xrif->compressed_size;
1561  last_encodeRate = m_xrif->encode_rate;
1562  last_differenceRate = m_xrif->difference_rate;
1563  last_reorderRate = m_xrif->reorder_rate;
1564  last_compressRate = m_xrif->compress_rate;
1565  }
1566 
1567  return 0;
1568 }
1569 
1570 }//namespace app
1571 } //namespace MagAOX
1572 #endif
The base-class for MagAO-X applications.
Definition: MagAOXApp.hpp:75
stateCodes::stateCodeT state()
Get the current state code.
Definition: MagAOXApp.hpp:2082
int registerIndiPropertyNew(pcf::IndiProperty &prop, int(*)(void *, const pcf::IndiProperty &))
Register an INDI property which is exposed for others to request a New Property for.
int createStandardIndiToggleSw(pcf::IndiProperty &prop, const std::string &name, const std::string &label="", const std::string &group="")
Create a standard R/W INDI switch with a single toggle element.
Definition: MagAOXApp.hpp:2321
int m_shutdown
Flag to signal it's time to shutdown. When not 0, the main loop exits.
Definition: MagAOXApp.hpp:102
int shutdown()
Get the value of the shutdown flag.
Definition: MagAOXApp.hpp:1146
indiDriver< MagAOXApp > * m_indiDriver
The INDI driver wrapper. Constructed and initialized by execute, which starts and stops communication...
Definition: MagAOXApp.hpp:537
static int log(const typename logT::messageT &msg, logPrioT level=logPrio::LOG_DEFAULT)
Make a log entry.
Definition: MagAOXApp.hpp:1590
int threadStart(std::thread &thrd, bool &thrdInit, pid_t &tpid, pcf::IndiProperty &thProp, int thrdPrio, const std::string &cpuset, const std::string &thrdName, thisPtr *thrdThis, Function &&thrdStart)
Start a thread, using this class's privileges to set priority, etc.
Definition: MagAOXApp.hpp:1950
std::string MagAOXPath
The base path of the MagAO-X system.
Definition: MagAOXApp.hpp:86
bool m_powerMgtEnabled
Flag controls whether power mgt is used. Set this in the constructor of a derived app....
Definition: MagAOXApp.hpp:981
pcf::IndiProperty m_indiP_xrifStats
pcf::IndiProperty m_indiP_writing
std::thread m_fgThread
A separate thread for the actual framegrabbings.
int m_fgThreadPrio
Priority of the framegrabber thread, should normally be > 00.
pid_t m_fgThreadID
F.g. thread PID.
size_t m_circBuffLength
The length of the circular buffer, in frames.
int initialize_xrif()
Initialize the xrif system.
int recordTelem(const telem_saving_state *)
uint8_t m_dataType
The ImageStreamIO type code.
bool m_fgThreadInit
Synchronizer to ensure f.g. thread initializes before doing dangerous things.
int m_typeSize
The pixel byte depth.
xrif_t m_xrif_timing
The xrif compression handle for image data.
uint64_t m_currSaveStart
The circular buffer position at which to start saving.
virtual int appStartup()
Startup functions.
size_t m_writeChunkLength
The number of frames to write at a time.
size_t m_height
The height of the image.
uint64_t m_currSaveStopFrameNo
The frame number of the image at which saving stopped (for logging)
int allocate_circbufs()
Worker function to allocate the circular buffers.
void handlerSigSegv(int signum, siginfo_t *siginf, void *ucont)
Handles SIGSEGV and SIGBUS. Sets m_restart to true.
int m_swThreadPrio
Priority of the stream writer thread, should normally be > 0, and <= m_fgThreadPrio.
int recordSavingStats(bool force=false)
pcf::IndiProperty m_swThreadProp
The property to hold the s.w. thread details.
unsigned m_semWait
The time in nsec to wait on the semaphore. Max is 999999999. Default is 5e8 nsec.
std::thread m_swThread
A separate thread for the actual writing.
std::string m_fgCpuset
The cpuset for the framegrabber thread. Ignored if empty (the default).
virtual int appLogic()
Implementation of the FSM for the Siglent SDG.
static void fgThreadStart(streamWriter *s)
Thread starter, called by fgThreadStart on thread construction. Calls fgThreadExec.
std::string m_outName
The name to use for outputting files, Default is m_shmimName.
INDI_NEWCALLBACK_DECL(streamWriter, m_indiP_writing)
char * m_xrif_timing_header
Storage for the xrif image data file header.
~streamWriter() noexcept
Destructor.
int allocate_xrif()
Worker function to configure and allocate the xrif handles.
int setSigSegvHandler()
Sets the handler for SIGSEGV and SIGBUS.
std::string m_swCpuset
The cpuset for the framegrabber thread. Ignored if empty (the default).
xrif_t m_xrif
The xrif compression handle for image data.
streamWriter()
Default c'tor.
virtual void setupConfig()
Setup the configuration system (called by MagAOXApp::setup())
int m_semaphoreNumber
The image structure semaphore index.
std::string m_rawimageDir
The path where files will be saved.
static void _handlerSigSegv(int signum, siginfo_t *siginf, void *ucont)
The handler called when SIGSEGV or SIGBUS is received, which will be due to ImageStreamIO server rese...
int recordSavingState(bool force=false)
int m_writing
Controls whether or not images are being written, and sequences start and stop of writing.
uint64_t m_currSaveStartFrameNo
The frame number of the image at which saving started (for logging)
uint64_t m_currChunkStart
The circular buffer starting position of the current to-be-written chunk.
pid_t m_swThreadID
S.w. thread pid.
bool m_logSaveStart
Flag indicating that the start saving log should entry should be made.
virtual void loadConfig()
load the configuration system results (called by MagAOXApp::setup())
pcf::IndiProperty m_fgThreadProp
The property to hold the f.g. thread details.
void swThreadExec()
Execute the stream writer main loop.
uint64_t m_nextChunkStart
The circular buffer starting position of the next to-be-written chunk.
static streamWriter * m_selfWriter
Static pointer to this (set in constructor). Used for getting out of the static SIGSEGV handler.
char * m_xrif_header
Storage for the xrif image data file header.
int doEncode()
Function called when semaphore is raised to do the encode and write.
static void swThreadStart(streamWriter *s)
Thread starter, called by swThreadStart on thread construction. Calls swThreadExec.
uint64_t m_currSaveStop
The circular buffer position at which to stop saving.
void fgThreadExec()
Execute the frame grabber main loop.
std::string m_shmimName
The name of the shared memory buffer.
dev::telemeter< streamWriter > telemeterT
size_t m_width
The width of the image.
virtual int appShutdown()
Do any needed shutdown tasks. Currently nothing in this app.
sem_t m_swSemaphore
Semaphore used to synchronize the fg thread and the sw thread.
bool m_swThreadInit
Synchronizer to ensure s.w. thread initializes before doing dangerous things.
#define MAGAOX_rawimageRelPath
The relative path to the raw images directory.
Definition: paths.hpp:92
#define REG_INDI_NEWPROP_NOCB(prop, propName, type)
Register a NEW INDI property with the class, with no callback.
Definition: indiMacros.hpp:247
#define INDI_NEWCALLBACK(prop)
Get the name of the static callback wrapper for a new property.
Definition: indiMacros.hpp:207
@ OPERATING
The device is operating, other than homing.
Definition: stateCodes.hpp:50
@ READY
The device is ready for operation, but is not operating.
Definition: stateCodes.hpp:51
#define INDI_IDLE
Definition: indiUtils.hpp:28
#define INDI_BUSY
Definition: indiUtils.hpp:30
#define INDI_OK
Definition: indiUtils.hpp:29
std::ostream & cerr()
void updateIfChanged(pcf::IndiProperty &p, const std::string &el, const T &newVal, indiDriverT *indiDriver, pcf::IndiProperty::PropertyStateType newState=pcf::IndiProperty::Ok)
Update the value of the INDI element, but only if it has changed.
Definition: indiUtils.hpp:95
void updateSwitchIfChanged(pcf::IndiProperty &p, const std::string &el, const pcf::IndiElement::SwitchStateType &newVal, indiDriverT *indiDriver, pcf::IndiProperty::PropertyStateType newState=pcf::IndiProperty::Ok)
Update the value of the INDI element, but only if it has changed.
Definition: indiUtils.hpp:206
const pcf::IndiProperty & ipRecv
INDI_VALIDATE_CALLBACK_PROPS(function, ipRecv)
INDI_NEWCALLBACK_DEFN(acesxeCtrl, m_indiP_windspeed)(const pcf
Definition: acesxeCtrl.hpp:687
Definition: dm.hpp:24
constexpr static logPrioT LOG_DEBUG
Used for debugging.
Definition: logPriority.hpp:52
constexpr static logPrioT LOG_CRITICAL
The process can not continue and will shut down (fatal)
Definition: logPriority.hpp:37
constexpr static logPrioT LOG_WARNING
A condition has occurred which may become an error, but the process continues.
Definition: logPriority.hpp:43
constexpr static logPrioT LOG_NOTICE
A normal but significant condition.
Definition: logPriority.hpp:46
#define NOT_WRITING
#define STOP_WRITING
#define WRITING
#define START_WRITING
A device which saves telemetry.
Definition: telemeter.hpp:52
int appShutdown()
Perform telemeter application shutdown.
Definition: telemeter.hpp:259
int loadConfig(appConfigurator &config)
Load the device section from an application configurator.
Definition: telemeter.hpp:208
int appLogic()
Perform telemeter application logic.
Definition: telemeter.hpp:253
int setupConfig(appConfigurator &config)
Setup an application configurator for the device section.
Definition: telemeter.hpp:195
int appStartup()
Starts the telemetry log thread.
Definition: telemeter.hpp:226
int checkRecordTimes(const telT &tel, telTs... tels)
Check the time of the last record for each telemetry type and make an entry if needed.
Definition: telemeter.hpp:266
Software CRITICAL log entry.
Software ERR log entry.