API
streamWriter.hpp
Go to the documentation of this file.
1 /** \file streamWriter.hpp
2  * \brief The MagAO-X Image Stream Writer
3  *
4  * \author Jared R. Males (jaredmales@gmail.com)
5  *
6  * \ingroup streamWriter_files
7  */
8 
9 #ifndef streamWriter_hpp
10 #define streamWriter_hpp
11 
12 #include <ImageStreamIO/ImageStruct.h>
13 #include <ImageStreamIO/ImageStreamIO.h>
14 
15 #include <xrif/xrif.h>
16 
17 #include <mx/sys/timeUtils.hpp>
18 
19 #include "../../libMagAOX/libMagAOX.hpp" //Note this is included on command line to trigger pch
20 #include "../../magaox_git_version.h"
21 
22 #define NOT_WRITING (0)
23 #define START_WRITING (1)
24 #define WRITING (2)
25 #define STOP_WRITING (3)
26 
27 namespace MagAOX
28 {
29 namespace app
30 {
31 
32 /** \defgroup streamWriter ImageStreamIO Stream Writing
33  * \brief Writes the contents of an ImageStreamIO image stream to disk.
34  *
35  * <a href="../handbook/operating/software/apps/streamWriter.html">Application Documentation</a>
36  *
37  * \ingroup apps
38  *
39  */
40 
41 /** \defgroup streamWriter_files ImageStreamIO Stream Writing
42  * \ingroup streamWriter
43  */
44 
45 /** MagAO-X application to control writing ImageStreamIO streams to disk.
46  *
47  * \ingroup streamWriter
48  *
49  */
50 class streamWriter : public MagAOXApp<>, public dev::telemeter<streamWriter>
51 {
53 
54  friend class dev::telemeter<streamWriter>;
55 
56  // Give the test harness access.
57  friend class streamWriter_test;
58 
59 protected:
60  /** \name configurable parameters
61  *@{
62  */
63 
64  std::string m_rawimageDir; ///< The path where files will be saved.
65 
66  size_t m_circBuffLength{1024}; ///< The length of the circular buffer, in frames
67 
68  size_t m_writeChunkLength{512}; ///< The number of frames to write at a time
69 
70  double m_maxChunkTime{10}; ///< The maximum time before writing regardless of number of frames.
71 
72  std::string m_shmimName; ///< The name of the shared memory buffer.
73 
74  std::string m_outName; ///< The name to use for outputting files, Default is m_shmimName.
75 
76  int m_semaphoreNumber{7}; ///< The image structure semaphore index.
77 
78  unsigned m_semWaitSec{0}; ///< The time in whole sec to wait on the semaphore, to which m_semWaitNSec is added. Default is 0 nsec.
79 
80  unsigned m_semWaitNSec{500000000}; ///< The time in nsec to wait on the semaphore, added to m_semWaitSec. Max is 999999999. Default is 5e8 nsec.
81 
82  int m_lz4accel{1};
83 
84  bool m_compress{true};
85 
86  ///@}
87 
88  size_t m_width{0}; ///< The width of the image
89  size_t m_height{0}; ///< The height of the image
90  uint8_t m_dataType{0}; ///< The ImageStreamIO type code.
91  int m_typeSize{0}; ///< The pixel byte depth
92 
93  char *m_rawImageCircBuff{nullptr};
94  uint64_t *m_timingCircBuff{nullptr};
95 
96  size_t m_currImage{0};
97 
98  double m_currImageTime{0}; ///< The write-time of the current image
99 
100  double m_currChunkStartTime{0}; ///< The write-time of the first image in the chunk
101 
102  // Writer book-keeping:
103  int m_writing{NOT_WRITING}; ///< Controls whether or not images are being written, and sequences start and stop of writing.
104 
105  uint64_t m_currChunkStart{0}; ///< The circular buffer starting position of the current to-be-written chunk.
106  uint64_t m_nextChunkStart{0}; ///< The circular buffer starting position of the next to-be-written chunk.
107 
108  uint64_t m_currSaveStart{0}; ///< The circular buffer position at which to start saving.
109  uint64_t m_currSaveStop{0}; ///< The circular buffer position at which to stop saving.
110 
111  uint64_t m_currSaveStopFrameNo{0}; ///< The frame number of the image at which saving stopped (for logging)
112 
113  /// The xrif compression handle for image data
114  xrif_t m_xrif{nullptr};
115 
116  /// Storage for the xrif image data file header
117  char *m_xrif_header{nullptr};
118 
119  /// The xrif compression handle for image data
120  xrif_t m_xrif_timing{nullptr};
121 
122  /// Storage for the xrif image data file header
123  char *m_xrif_timing_header{nullptr};
124 
125 public:
126  /// Default c'tor
127  streamWriter();
128 
129  /// Destructor
130  ~streamWriter() noexcept;
131 
132  /// Setup the configuration system (called by MagAOXApp::setup())
133  virtual void setupConfig();
134 
135  /// load the configuration system results (called by MagAOXApp::setup())
136  virtual void loadConfig();
137 
138  /// Startup functions
139  /** Sets up the INDI vars.
140  *
141  */
142  virtual int appStartup();
143 
144  /// Implementation of the FSM for the Siglent SDG
145  virtual int appLogic();
146 
147  /// Do any needed shutdown tasks. Currently nothing in this app.
148  virtual int appShutdown();
149 
150 protected:
151  /** \name SIGSEGV & SIGBUS signal handling
152  * These signals occur as a result of a ImageStreamIO source server resetting (e.g. changing frame sizes).
153  * When they occur a restart of the framegrabber and framewriter thread main loops is triggered.
154  *
155  * @{
156  */
157  bool m_restart{false};
158 
159  static streamWriter *m_selfWriter; ///< Static pointer to this (set in constructor). Used for getting out of the static SIGSEGV handler.
160 
161  /// Initialize the xrif system.
162  /** Allocates the handles and headers pointers.
163  *
164  * \returns 0 on success.
165  * \returns -1 on error.
166  */
167  int initialize_xrif();
168 
169  /// Sets the handler for SIGSEGV and SIGBUS
170  /** These are caused by ImageStreamIO server resets.
171  */
172  int setSigSegvHandler();
173 
174  /// The handler called when SIGSEGV or SIGBUS is received, which will be due to ImageStreamIO server resets. Just a wrapper for handlerSigSegv.
175  static void _handlerSigSegv(int signum,
176  siginfo_t *siginf,
177  void *ucont);
178 
179  /// Handles SIGSEGV and SIGBUS. Sets m_restart to true.
180  void handlerSigSegv(int signum,
181  siginfo_t *siginf,
182  void *ucont);
183  ///@}
184 
185  /** \name Framegrabber Thread
186  * This thread monitors the ImageStreamIO buffer and copies its images to the circular buffer.
187  *
188  * @{
189  */
190  int m_fgThreadPrio{1}; ///< Priority of the framegrabber thread, should normally be > 00.
191 
192  std::string m_fgCpuset; ///< The cpuset for the framegrabber thread. Ignored if empty (the default).
193 
194  std::thread m_fgThread; ///< A separate thread for the actual framegrabbings
195 
196  bool m_fgThreadInit{true}; ///< Synchronizer to ensure f.g. thread initializes before doing dangerous things.
197 
198  pid_t m_fgThreadID{0}; ///< F.g. thread PID.
199 
200  pcf::IndiProperty m_fgThreadProp; ///< The property to hold the f.g. thread details.
201 
202  /// Worker function to allocate the circular buffers.
203  /** This takes place in the fg thread after connecting to the stream.
204  *
205  * \returns 0 on sucess.
206  * \returns -1 on error.
207  */
208  int allocate_circbufs();
209 
210  /// Worker function to configure and allocate the xrif handles.
211  /** This takes place in the fg thread after connecting to the stream.
212  *
213  * \returns 0 on sucess.
214  * \returns -1 on error.
215  */
216  int allocate_xrif();
217 
218  /// Thread starter, called by fgThreadStart on thread construction. Calls fgThreadExec.
219  static void fgThreadStart(streamWriter *s /**< [in] a pointer to an streamWriter instance (normally this) */);
220 
221  /// Execute the frame grabber main loop.
222  void fgThreadExec();
223 
224  ///@}
225 
226  /** \name Stream Writer Thread
227  * This thread writes chunks of the circular buffer to disk.
228  *
229  * @{
230  */
231  int m_swThreadPrio{1}; ///< Priority of the stream writer thread, should normally be > 0, and <= m_fgThreadPrio.
232 
233  std::string m_swCpuset; ///< The cpuset for the framegrabber thread. Ignored if empty (the default).
234 
235  sem_t m_swSemaphore; ///< Semaphore used to synchronize the fg thread and the sw thread.
236 
237  std::thread m_swThread; ///< A separate thread for the actual writing
238 
239  bool m_swThreadInit{true}; ///< Synchronizer to ensure s.w. thread initializes before doing dangerous things.
240 
241  pid_t m_swThreadID{0}; ///< S.w. thread pid.
242 
243  pcf::IndiProperty m_swThreadProp; ///< The property to hold the s.w. thread details.
244 
245  size_t m_fnameSz{0};
246 
247  char *m_fname{nullptr};
248 
249  std::string m_fnameBase;
250 
251  /// Thread starter, called by swThreadStart on thread construction. Calls swThreadExec.
252  static void swThreadStart(streamWriter *s /**< [in] a pointer to an streamWriter instance (normally this) */);
253 
254  /// Execute the stream writer main loop.
255  void swThreadExec();
256 
257  /// Function called when semaphore is raised to do the encode and write.
258  int doEncode();
259  ///@}
260 
261  // INDI:
262 protected:
263  // declare our properties
264  pcf::IndiProperty m_indiP_writing;
265 
266  pcf::IndiProperty m_indiP_xrifStats;
267 
268 public:
270 
271  void updateINDI();
272 
273  /** \name Telemeter Interface
274  *
275  * @{
276  */
277  int checkRecordTimes();
278 
279  int recordTelem(const telem_saving_state *);
280 
281  int recordSavingState(bool force = false);
282  int recordSavingStats(bool force = false);
283 
284  ///@}
285 };
286 
287 // Set self pointer to null so app starts up uninitialized.
289 
290 streamWriter::streamWriter() : MagAOXApp(MAGAOX_CURRENT_SHA1, MAGAOX_REPO_MODIFIED)
291 {
292  m_powerMgtEnabled = false;
293 
294  m_selfWriter = this;
295 
296  return;
297 }
298 
300 {
301  if (m_xrif)
302  xrif_delete(m_xrif);
303 
304  if (m_xrif_header)
305  free(m_xrif_header);
306 
307  if (m_xrif_timing)
308  xrif_delete(m_xrif_timing);
309 
311  free(m_xrif_timing_header);
312 
313  return;
314 }
315 
317 {
318  config.add("writer.savePath", "", "writer.savePath", argType::Required, "writer", "savePath", false, "string", "The absolute path where images are saved. Will use MagAO-X default if not set.");
319 
320  config.add("writer.circBuffLength", "", "writer.circBuffLength", argType::Required, "writer", "circBuffLength", false, "size_t", "The length in frames of the circular buffer. Should be an integer multiple of and larger than writeChunkLength.");
321 
322  config.add("writer.writeChunkLength", "", "writer.writeChunkLength", argType::Required, "writer", "writeChunkLength", false, "size_t", "The length in frames of the chunks to write to disk. Should be smaller than circBuffLength.");
323 
324  config.add("writer.maxChunkTime", "", "writer.maxChunkTime", argType::Required, "writer", "maxChunkTime", false, "float", "The max length in seconds of the chunks to write to disk. Default is 60 sec.");
325 
326  config.add("writer.threadPrio", "", "writer.threadPrio", argType::Required, "writer", "threadPrio", false, "int", "The real-time priority of the stream writer thread.");
327 
328  config.add("writer.cpuset", "", "writer.cpuset", argType::Required, "writer", "cpuset", false, "int", "The cpuset for the writer thread.");
329 
330  config.add("writer.compress", "", "writer.compress", argType::Required, "writer", "compress", false, "bool", "Flag to set whether compression is used. Default true.");
331 
332  config.add("writer.lz4accel", "", "writer.lz4accel", argType::Required, "writer", "lz4accel", false, "int", "The LZ4 acceleration parameter. Larger is faster, but lower compression.");
333 
334  config.add("writer.outName", "", "writer.outName", argType::Required, "writer", "outName", false, "int", "The name to use for output files. Default is the shmimName.");
335 
336  config.add("framegrabber.shmimName", "", "framegrabber.shmimName", argType::Required, "framegrabber", "shmimName", false, "int", "The name of the stream to monitor. From /tmp/shmimName.im.shm.");
337 
338  config.add("framegrabber.semaphoreNumber", "", "framegrabber.semaphoreNumber", argType::Required, "framegrabber", "semaphoreNumber", false, "int", "The semaphore to wait on. Default is 7.");
339 
340  config.add("framegrabber.semWait", "", "framegrabber.semWait", argType::Required, "framegrabber", "semWait", false, "int", "The time in nsec to wait on the semaphore. Max is 999999999. Default is 5e8 nsec.");
341 
342  config.add("framegrabber.threadPrio", "", "framegrabber.threadPrio", argType::Required, "framegrabber", "threadPrio", false, "int", "The real-time priority of the framegrabber thread.");
343 
344  config.add("framegrabber.cpuset", "", "framegrabber.cpuset", argType::Required, "framegrabber", "cpuset", false, "string", "The cpuset for the framegrabber thread.");
345 
346  telemeterT::setupConfig(config);
347 }
348 
350 {
351 
352  config(m_circBuffLength, "writer.circBuffLength");
353  config(m_writeChunkLength, "writer.writeChunkLength");
354  config(m_maxChunkTime, "writer.maxChunkTime");
355  config(m_swThreadPrio, "writer.threadPrio");
356  config(m_swCpuset, "writer.cpuset");
357  config(m_compress, "writer.compress");
358  config(m_lz4accel, "writer.lz4accel");
359  if (m_lz4accel < XRIF_LZ4_ACCEL_MIN)
360  m_lz4accel = XRIF_LZ4_ACCEL_MIN;
361  if (m_lz4accel > XRIF_LZ4_ACCEL_MAX)
362  m_lz4accel = XRIF_LZ4_ACCEL_MAX;
363 
364  config(m_shmimName, "framegrabber.shmimName");
365 
367  config(m_outName, "writer.outName");
368 
369  config(m_semaphoreNumber, "framegrabber.semaphoreNumber");
370  config(m_semWaitNSec, "framegrabber.semWait");
371 
372  config(m_fgThreadPrio, "framegrabber.threadPrio");
373  config(m_fgCpuset, "framegrabber.cpuset");
374 
375  // Set some defaults
376  // Setup default log path
378  config(m_rawimageDir, "writer.savePath");
379 
380  if (telemeterT::loadConfig(config) < 0)
381  {
382  log<text_log>("Error during telemeter config", logPrio::LOG_CRITICAL);
383  m_shutdown = true;
384  }
385 }
386 
388 {
389  // Create save directory.
390  errno = 0;
391  if (mkdir(m_rawimageDir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH) < 0)
392  {
393  if (errno != EEXIST)
394  {
395  std::stringstream logss;
396  logss << "Failed to create image directory (" << m_rawimageDir << "). Errno says: " << strerror(errno);
397  log<software_critical>({__FILE__, __LINE__, errno, 0, logss.str()});
398 
399  return -1;
400  }
401  }
402 
403  // set up the INDI properties
406 
407  // Register the stats INDI property
408  REG_INDI_NEWPROP_NOCB(m_indiP_xrifStats, "xrif", pcf::IndiProperty::Number);
409  m_indiP_xrifStats.setLabel("xrif compression performance");
410 
411  indi::addNumberElement<float>(m_indiP_xrifStats, "ratio", 0, 1.0, 0.0, "%0.2f", "Compression Ratio");
412 
413  indi::addNumberElement<float>(m_indiP_xrifStats, "differenceMBsec", 0, std::numeric_limits<float>::max(), 0.0, "%0.2f", "Differencing Rate [MB/sec]");
414 
415  indi::addNumberElement<float>(m_indiP_xrifStats, "reorderMBsec", 0, std::numeric_limits<float>::max(), 0.0, "%0.2f", "Reordering Rate [MB/sec]");
416 
417  indi::addNumberElement<float>(m_indiP_xrifStats, "compressMBsec", 0, std::numeric_limits<float>::max(), 0.0, "%0.2f", "Compression Rate [MB/sec]");
418 
419  indi::addNumberElement<float>(m_indiP_xrifStats, "encodeMBsec", 0, std::numeric_limits<float>::max(), 0.0, "%0.2f", "Total Encoding Rate [MB/sec]");
420 
421  indi::addNumberElement<float>(m_indiP_xrifStats, "differenceFPS", 0, std::numeric_limits<float>::max(), 0.0, "%0.2f", "Differencing Rate [f.p.s.]");
422 
423  indi::addNumberElement<float>(m_indiP_xrifStats, "reorderFPS", 0, std::numeric_limits<float>::max(), 0.0, "%0.2f", "Reordering Rate [f.p.s.]");
424 
425  indi::addNumberElement<float>(m_indiP_xrifStats, "compressFPS", 0, std::numeric_limits<float>::max(), 0.0, "%0.2f", "Compression Rate [f.p.s.]");
426 
427  indi::addNumberElement<float>(m_indiP_xrifStats, "encodeFPS", 0, std::numeric_limits<float>::max(), 0.0, "%0.2f", "Total Encoding Rate [f.p.s.]");
428 
429  // Now set up the framegrabber and writer threads.
430  // - need SIGSEGV and SIGBUS handling for ImageStreamIO restarts
431  // - initialize the semaphore
432  // - start the threads
433 
434  if (setSigSegvHandler() < 0)
435  return log<software_error, -1>({__FILE__, __LINE__});
436 
437  if (sem_init(&m_swSemaphore, 0, 0) < 0)
438  return log<software_critical, -1>({__FILE__, __LINE__, errno, 0, "Initializing S.W. semaphore"});
439 
440  // Check if we have a safe writeChunkLengthh
442  {
443  return log<software_critical, -1>({__FILE__, __LINE__, "Write chunk length is not a divisor of circular buffer length."});
444  }
445 
446  if (initialize_xrif() < 0)
447  log<software_critical, -1>({__FILE__, __LINE__});
448 
450  {
451  return log<software_critical, -1>({__FILE__, __LINE__});
452  }
453 
455  {
456  log<software_critical, -1>({__FILE__, __LINE__});
457  }
458 
459  if (telemeterT::appStartup() < 0)
460  {
461  return log<software_error, -1>({__FILE__, __LINE__});
462  }
463 
464  return 0;
465 }
466 
468 {
469 
470  // first do a join check to see if other threads have exited.
471  // these will throw if the threads are really gone
472  try
473  {
474  if (pthread_tryjoin_np(m_fgThread.native_handle(), 0) == 0)
475  {
476  log<software_error>({__FILE__, __LINE__, "framegrabber thread has exited"});
477  return -1;
478  }
479  }
480  catch (...)
481  {
482  log<software_error>({__FILE__, __LINE__, "framegrabber thread has exited"});
483  return -1;
484  }
485 
486  try
487  {
488  if (pthread_tryjoin_np(m_swThread.native_handle(), 0) == 0)
489  {
490  log<software_error>({__FILE__, __LINE__, "stream thread has exited"});
491  return -1;
492  }
493  }
494  catch (...)
495  {
496  log<software_error>({__FILE__, __LINE__, "streamwriter thread has exited"});
497  return -1;
498  }
499 
500  switch (m_writing)
501  {
502  case NOT_WRITING:
504  break;
505  default:
507  }
508 
509  if (state() == stateCodes::OPERATING)
510  {
511  if (telemeterT::appLogic() < 0)
512  {
513  log<software_error>({__FILE__, __LINE__});
514  return 0;
515  }
516  }
517 
518  updateINDI();
519 
520  return 0;
521 }
522 
524 {
525  try
526  {
527  if (m_fgThread.joinable())
528  {
529  m_fgThread.join();
530  }
531  }
532  catch (...)
533  {
534  }
535 
536  try
537  {
538  if (m_swThread.joinable())
539  {
540  m_swThread.join();
541  }
542  }
543  catch (...)
544  {
545  }
546 
547  if (m_xrif)
548  {
549  xrif_delete(m_xrif);
550  m_xrif = nullptr;
551  }
552 
553  if (m_xrif_timing)
554  {
555  xrif_delete(m_xrif_timing);
556  m_xrif_timing = nullptr;
557  }
558 
560 
561  return 0;
562 }
563 
565 {
566  xrif_error_t rv = xrif_new(&m_xrif);
567  if (rv != XRIF_NOERROR)
568  {
569  return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif handle allocation or initialization error."});
570  }
571 
572  if (m_compress)
573  {
574  rv = xrif_configure(m_xrif, XRIF_DIFFERENCE_PREVIOUS, XRIF_REORDER_BYTEPACK, XRIF_COMPRESS_LZ4);
575  if (rv != XRIF_NOERROR)
576  {
577  return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif handle configuration error."});
578  }
579  }
580  else
581  {
582  std::cerr << "not compressing . . . \n";
583  rv = xrif_configure(m_xrif, XRIF_DIFFERENCE_NONE, XRIF_REORDER_NONE, XRIF_COMPRESS_NONE);
584  if (rv != XRIF_NOERROR)
585  {
586  return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif handle configuration error."});
587  }
588  }
589 
590  errno = 0;
591  m_xrif_header = (char *)malloc(XRIF_HEADER_SIZE * sizeof(char));
592  if (m_xrif_header == NULL)
593  {
594  return log<software_critical, -1>({__FILE__, __LINE__, errno, 0, "xrif header allocation failed."});
595  }
596 
597  rv = xrif_new(&m_xrif_timing);
598  if (rv != XRIF_NOERROR)
599  {
600  return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif handle allocation or initialization error."});
601  }
602 
603  // m_xrif_timing->reorder_method = XRIF_REORDER_NONE;
604  rv = xrif_configure(m_xrif_timing, XRIF_DIFFERENCE_NONE, XRIF_REORDER_NONE, XRIF_COMPRESS_NONE);
605  if (rv != XRIF_NOERROR)
606  {
607  return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif handle configuration error."});
608  }
609 
610  errno = 0;
611  m_xrif_timing_header = (char *)malloc(XRIF_HEADER_SIZE * sizeof(char));
612  if (m_xrif_timing_header == NULL)
613  {
614  return log<software_critical, -1>({__FILE__, __LINE__, errno, 0, "xrif header allocation failed."});
615  }
616 
617  return 0;
618 }
619 
621 {
622  struct sigaction act;
623  sigset_t set;
624 
625  act.sa_sigaction = &streamWriter::_handlerSigSegv;
626  act.sa_flags = SA_SIGINFO;
627  sigemptyset(&set);
628  act.sa_mask = set;
629 
630  errno = 0;
631  if (sigaction(SIGSEGV, &act, 0) < 0)
632  {
633  std::string logss = "Setting handler for SIGSEGV failed. Errno says: ";
634  logss += strerror(errno);
635 
636  log<software_error>({__FILE__, __LINE__, errno, 0, logss});
637 
638  return -1;
639  }
640 
641  errno = 0;
642  if (sigaction(SIGBUS, &act, 0) < 0)
643  {
644  std::string logss = "Setting handler for SIGBUS failed. Errno says: ";
645  logss += strerror(errno);
646 
647  log<software_error>({__FILE__, __LINE__, errno, 0, logss});
648 
649  return -1;
650  }
651 
652  log<text_log>("Installed SIGSEGV/SIGBUS signal handler.", logPrio::LOG_DEBUG);
653 
654  return 0;
655 }
656 
658  siginfo_t *siginf,
659  void *ucont)
660 {
661  m_selfWriter->handlerSigSegv(signum, siginf, ucont);
662 }
663 
665  siginfo_t *siginf,
666  void *ucont)
667 {
668  static_cast<void>(signum);
669  static_cast<void>(siginf);
670  static_cast<void>(ucont);
671 
672  m_restart = true;
673 
674  return;
675 }
676 
678 {
679  if (m_rawImageCircBuff)
680  {
681  free(m_rawImageCircBuff);
682  }
683 
684  errno = 0;
686 
687  if (m_rawImageCircBuff == NULL)
688  {
689  return log<software_critical, -1>({__FILE__, __LINE__, errno, 0, "buffer allocation failure"});
690  }
691 
692  if (m_timingCircBuff)
693  {
694  free(m_timingCircBuff);
695  }
696 
697  errno = 0;
698  m_timingCircBuff = (uint64_t *)malloc(5 * sizeof(uint64_t) * m_circBuffLength);
699  if (m_timingCircBuff == NULL)
700  {
701  return log<software_critical, -1>({__FILE__, __LINE__, errno, 0, "buffer allocation failure"});
702  }
703 
704  return 0;
705 }
706 
708 {
709  // Set up the image data xrif handle
710  xrif_error_t rv;
711 
712  if (m_compress)
713  {
714  rv = xrif_configure(m_xrif, XRIF_DIFFERENCE_PREVIOUS, XRIF_REORDER_BYTEPACK, XRIF_COMPRESS_LZ4);
715  if (rv != XRIF_NOERROR)
716  {
717  return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif handle configuration error."});
718  }
719  }
720  else
721  {
722  std::cerr << "not compressing . . . \n";
723  rv = xrif_configure(m_xrif, XRIF_DIFFERENCE_NONE, XRIF_REORDER_NONE, XRIF_COMPRESS_NONE);
724  if (rv != XRIF_NOERROR)
725  {
726  return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif handle configuration error."});
727  }
728  }
729 
730  rv = xrif_set_size(m_xrif, m_width, m_height, 1, m_writeChunkLength, m_dataType);
731  if (rv != XRIF_NOERROR)
732  {
733  return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif_set_size error."});
734  }
735 
736  rv = xrif_allocate_raw(m_xrif);
737  if (rv != XRIF_NOERROR)
738  {
739  return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif_allocate_raw error."});
740  }
741 
742  rv = xrif_allocate_reordered(m_xrif);
743  if (rv != XRIF_NOERROR)
744  {
745  return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif_allocate_reordered error."});
746  }
747 
748  // Set up the timing data xrif handle
749  rv = xrif_configure(m_xrif_timing, XRIF_DIFFERENCE_NONE, XRIF_REORDER_NONE, XRIF_COMPRESS_NONE);
750  if (rv != XRIF_NOERROR)
751  {
752  return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif handle configuration error."});
753  }
754 
755  rv = xrif_set_size(m_xrif_timing, 5, 1, 1, m_writeChunkLength, XRIF_TYPECODE_UINT64);
756  if (rv != XRIF_NOERROR)
757  {
758  return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif_set_size error."});
759  }
760 
761  rv = xrif_allocate_raw(m_xrif_timing);
762  if (rv != XRIF_NOERROR)
763  {
764  return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif_allocate_raw error."});
765  }
766 
767  rv = xrif_allocate_reordered(m_xrif_timing);
768  if (rv != XRIF_NOERROR)
769  {
770  return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif_allocate_reordered error."});
771  }
772 
773  return 0;
774 }
775 
777 {
778  o->fgThreadExec();
779 }
780 
782 {
783  m_fgThreadID = syscall(SYS_gettid);
784 
785  // Wait fpr the thread starter to finish initializing this thread.
786  while (m_fgThreadInit == true && m_shutdown == 0)
787  {
788  sleep(1);
789  }
790 
791  timespec missing_ts;
792 
793  IMAGE image;
794  ino_t inode = 0; // The inode of the image stream file
795 
796  bool opened = false;
797 
798  while (m_shutdown == 0)
799  {
800  /* Initialize ImageStreamIO
801  */
802  opened = false;
803  m_restart = false; // Set this up front, since we're about to restart.
804 
805  sem_t *sem{nullptr}; ///< The semaphore to monitor for new image data
806 
807  int logged = 0;
808  while (!opened && !m_shutdown && !m_restart)
809  {
810  // b/c ImageStreamIO prints every single time, and latest version don't support stopping it yet, and that isn't thread-safe-able anyway
811  // we do our own checks. This is the same code in ImageStreamIO_openIm...
812  int SM_fd;
813  char SM_fname[200];
814  ImageStreamIO_filename(SM_fname, sizeof(SM_fname), m_shmimName.c_str());
815  SM_fd = open(SM_fname, O_RDWR);
816  if (SM_fd == -1)
817  {
818  if (!logged)
819  log<text_log>("ImageStream " + m_shmimName + " not found (yet). Retrying . . .", logPrio::LOG_NOTICE);
820  logged = 1;
821  sleep(1); // be patient
822  continue;
823  }
824 
825  // Found and opened, close it and then use ImageStreamIO
826  logged = 0;
827  close(SM_fd);
828 
829  if (ImageStreamIO_openIm(&image, m_shmimName.c_str()) == 0)
830  {
831  if (image.md[0].sem <= m_semaphoreNumber) ///<\todo this isn't right--> isn't there a define in cacao to use?
832  {
833  ImageStreamIO_closeIm(&image);
834  mx::sys::sleep(1); // We just need to wait for the server process to finish startup.
835  }
836  else
837  {
838  opened = true;
839 
840  char SM_fname[200];
841  ImageStreamIO_filename(SM_fname, sizeof(SM_fname), m_shmimName.c_str());
842 
843  struct stat buffer;
844  int rv = stat(SM_fname, &buffer);
845 
846  if (rv != 0)
847  {
848  log<software_critical>({__FILE__, __LINE__, errno, "Could not get inode for " + m_shmimName + ". Source process will need to be restarted."});
849  ImageStreamIO_closeIm(&image);
850  return;
851  }
852  inode = buffer.st_ino;
853  }
854  }
855  else
856  {
857  mx::sys::sleep(1); // be patient
858  }
859  }
860 
861  if (m_restart)
862  continue; // this is kinda dumb. we just go around on restart, so why test in the while loop at all?
863 
864  if (m_shutdown || !opened)
865  {
866  if (!opened)
867  return;
868 
869  ImageStreamIO_closeIm(&image);
870  return;
871  }
872 
873  // now get a good semaphore
874  m_semaphoreNumber = ImageStreamIO_getsemwaitindex(&image, m_semaphoreNumber); // ask for semaphore we had before
875 
876  if (m_semaphoreNumber < 0)
877  {
878  log<software_critical>({__FILE__, __LINE__, "No valid semaphore found for " + m_shmimName + ". Source process will need to be restarted."});
879  return;
880  }
881 
882  log<software_info>({__FILE__, __LINE__, "got semaphore index " + std::to_string(m_semaphoreNumber) + " for " + m_shmimName});
883 
884  ImageStreamIO_semflush(&image, m_semaphoreNumber);
885 
886  sem = image.semptr[m_semaphoreNumber]; ///< The semaphore to monitor for new image data
887 
888  m_dataType = image.md[0].datatype;
889  m_typeSize = ImageStreamIO_typesize(m_dataType);
890  m_width = image.md[0].size[0];
891  m_height = image.md[0].size[1];
892  size_t length;
893  if (image.md[0].naxis == 3)
894  {
895  length = image.md[0].size[2];
896  }
897  else
898  {
899  length = 1;
900  }
901  std::cerr << "connected"
902  << " " << m_width << "x" << m_height << "x" << (int)m_dataType << " (" << m_typeSize << ")" << std::endl;
903 
904  // Now allocate the circBuffs
905  if (allocate_circbufs() < 0)
906  return; // will cause shutdown!
907 
908  // And allocate the xrifs
909  if (allocate_xrif() < 0)
910  return; // Will cause shutdown!
911 
912  uint8_t atype;
913  size_t snx, sny, snz;
914 
915  uint64_t curr_image; // The current cnt1 index
916  m_currImage = 0;
917  m_currChunkStart = 0;
918  m_nextChunkStart = 0;
919 
920  // Initialized curr_image ...
921  if (image.md[0].naxis > 2)
922  {
923  curr_image = image.md[0].cnt1;
924  }
925  else
926  {
927  curr_image = 0;
928  }
929 
930  uint64_t last_cnt0; // = ((uint64_t)-1);
931 
932  // so we can initialize last_cnt0 to avoid frame skip on startup
933  if (image.cntarray)
934  {
935  last_cnt0 = image.cntarray[curr_image];
936  }
937  else
938  {
939  last_cnt0 = image.md[0].cnt0;
940  }
941 
942  int cnt0flag = 0;
943 
944  bool restartWriting = false; // flag to prevent logging on a logging restart
945 
946  // This is the main image grabbing loop.
947  while (!m_shutdown && !m_restart)
948  {
949  timespec ts;
951 
952  if(sem_timedwait(sem, &ts) == 0)
953  {
954  if (image.md[0].naxis > 2)
955  {
956  curr_image = image.md[0].cnt1;
957  }
958  else
959  {
960  curr_image = 0;
961  }
962 
963  atype = image.md[0].datatype;
964  snx = image.md[0].size[0];
965  sny = image.md[0].size[1];
966  if (image.md[0].naxis == 3)
967  {
968  snz = image.md[0].size[2];
969  }
970  else
971  {
972  snz = 1;
973  }
974 
975  if (atype != m_dataType || snx != m_width || sny != m_height || snz != length)
976  {
977  break; // exit the nearest while loop and get the new image setup.
978  }
979 
980  if (m_shutdown || m_restart)
981  {
982  break; // Check for exit signals
983  }
984 
985  uint64_t new_cnt0;
986  if (image.cntarray)
987  {
988  new_cnt0 = image.cntarray[curr_image];
989  }
990  else
991  {
992  new_cnt0 = image.md[0].cnt0;
993  }
994 
995  #ifdef SW_DEBUG
996  std::cerr << "new_cnt0: " << new_cnt0 << "\n";
997  #endif
998 
999  ///\todo cleanup skip frame handling.
1000  if (new_cnt0 == last_cnt0) //<- this probably isn't useful really
1001  {
1002  log<text_log>("semaphore raised but cnt0 has not changed -- we're probably getting behind", logPrio::LOG_WARNING);
1003  ++cnt0flag;
1004  if (cnt0flag > 10)
1005  {
1006  m_restart = true; // if we get here 10 times then something else is wrong.
1007  }
1008  continue;
1009  }
1010 
1011  if (new_cnt0 - last_cnt0 > 1) //<- this is what we want to check.
1012  {
1013  log<text_log>("cnt0 changed by more than 1. Frame skipped.", logPrio::LOG_WARNING);
1014  }
1015 
1016  cnt0flag = 0;
1017 
1018  last_cnt0 = new_cnt0;
1019 
1020  char *curr_dest = m_rawImageCircBuff + m_currImage * m_width * m_height * m_typeSize;
1021  char *curr_src = (char *)image.array.raw + curr_image * m_width * m_height * m_typeSize;
1022 
1023  memcpy(curr_dest, curr_src, m_width * m_height * m_typeSize);
1024 
1025  uint64_t *curr_timing = m_timingCircBuff + 5 * m_currImage;
1026 
1027  if (image.cntarray)
1028  {
1029  curr_timing[0] = image.cntarray[curr_image];
1030  curr_timing[1] = image.atimearray[curr_image].tv_sec;
1031  curr_timing[2] = image.atimearray[curr_image].tv_nsec;
1032  curr_timing[3] = image.writetimearray[curr_image].tv_sec;
1033  curr_timing[4] = image.writetimearray[curr_image].tv_nsec;
1034  }
1035  else
1036  {
1037  curr_timing[0] = image.md[0].cnt0;
1038  curr_timing[1] = image.md[0].atime.tv_sec;
1039  curr_timing[2] = image.md[0].atime.tv_nsec;
1040  curr_timing[3] = image.md[0].writetime.tv_sec;
1041  curr_timing[4] = image.md[0].writetime.tv_nsec;
1042  }
1043 
1044  // Check if we need to time-stamp ourselves -- for old cacao streams
1045  if (curr_timing[1] == 0)
1046  {
1047 
1048  if (clock_gettime(CLOCK_REALTIME, &missing_ts) < 0)
1049  {
1050  log<software_critical>({__FILE__, __LINE__, errno, 0, "clock_gettime"});
1051  return;
1052  }
1053 
1054  curr_timing[1] = missing_ts.tv_sec;
1055  curr_timing[2] = missing_ts.tv_nsec;
1056  }
1057 
1058  // just set w-time to a-time if it's missing
1059  if (curr_timing[3] == 0)
1060  {
1061  curr_timing[3] = curr_timing[1];
1062  curr_timing[4] = curr_timing[2];
1063  }
1064 
1065  m_currImageTime = 1.0 * curr_timing[3] + (1.0 * curr_timing[4]) / 1e9;
1066 
1067  if (m_shutdown && m_writing == WRITING)
1068  {
1070  }
1071 
1072  switch (m_writing)
1073  {
1074  case START_WRITING:
1075 
1079 
1080  if(!restartWriting) //We only log if this is really a start
1081  {
1082  log<saving_start>({1, new_cnt0});
1083  }
1084  else //on a restart after a timeout we don't log
1085  {
1086  restartWriting = false;
1087  }
1088 
1089  m_writing = WRITING;
1090 
1091  // fall through
1092  case WRITING:
1094  {
1097  m_currSaveStopFrameNo = new_cnt0;
1098 
1099  #ifdef SW_DEBUG
1100  std::cerr << __FILE__ << " " << __LINE__ << " WRITING " << m_currImage << " "
1101  << m_nextChunkStart << " "
1102  << (m_currImage - m_nextChunkStart == m_writeChunkLength - 1) << " "
1104  << new_cnt0 << "\n";
1105  #endif
1106 
1107  // Now tell the writer to get going
1108  if (sem_post(&m_swSemaphore) < 0)
1109  {
1110  log<software_critical>({__FILE__, __LINE__, errno, 0, "Error posting to semaphore"});
1111  return;
1112  }
1113 
1116  {
1117  m_nextChunkStart = 0;
1118  }
1119 
1122  }
1124  {
1127  m_currSaveStopFrameNo = new_cnt0;
1128 
1129  #ifdef SW_DEBUG
1130  std::cerr << __FILE__ << " " << __LINE__ << " IMAGE TIME WRITING " << m_currImage << " "
1131  << m_nextChunkStart << " "
1132  << (m_currImage - m_nextChunkStart == m_writeChunkLength - 1) << " "
1134  << new_cnt0 << "\n";
1135  #endif
1136 
1137  // Now tell the writer to get going
1138  if (sem_post(&m_swSemaphore) < 0)
1139  {
1140  log<software_critical>({__FILE__, __LINE__, errno, 0, "Error posting to semaphore"});
1141  return;
1142  }
1143 
1145  restartWriting = true;
1146 
1147  }
1148  break;
1149 
1150  case STOP_WRITING:
1153  m_currSaveStopFrameNo = new_cnt0;
1154 
1155  #ifdef SW_DEBUG
1156  std::cerr << __FILE__ << " " << __LINE__ << " STOP_WRITING\n";
1157  #endif
1158 
1159  // Now tell the writer to get going
1160  if (sem_post(&m_swSemaphore) < 0)
1161  {
1162  log<software_critical>({__FILE__, __LINE__, errno, 0, "Error posting to semaphore"});
1163  return;
1164  }
1165  restartWriting = false;
1166  break;
1167 
1168  default:
1169  break;
1170  }
1171 
1172  ++m_currImage;
1174  {
1175  m_currImage = 0;
1176  }
1177  }
1178  else
1179  {
1180  // If semaphore times-out or errors, we first cleanup any writing that needs to be done
1181  //we can also get here if a signal interrupts the sem wait which is triggered by INDI callbacks
1182  switch (m_writing)
1183  {
1184  case WRITING:
1185  // Here, if there is at least 1 image, we check for delta-time > m_maxChunkTime
1186  // then write
1187  if ((m_currImage - m_nextChunkStart > 0) && (mx::sys::get_curr_time() - m_currChunkStartTime > m_maxChunkTime))
1188  {
1191  m_currSaveStopFrameNo = last_cnt0;
1192 
1193  #ifdef SW_DEBUG
1194  std::cerr << __FILE__ << " " << __LINE__ << " TIMEOUT WRITING " << " "
1195  << m_currImage << " " << m_nextChunkStart << " " <<(m_currImage - m_nextChunkStart) << " "
1196  << last_cnt0 << "\n";
1197  #endif
1198 
1199  // Now tell the writer to get going
1200  if (sem_post(&m_swSemaphore) < 0)
1201  {
1202  log<software_critical>({__FILE__, __LINE__, errno, 0, "Error posting to semaphore"});
1203  return;
1204  }
1205 
1207  restartWriting = true;
1208 
1209  }
1210  break;
1211  case STOP_WRITING:
1212  // If we timed-out while STOP_WRITING is set, we trigger a write.
1215  m_currSaveStopFrameNo = last_cnt0;
1216 
1217  #ifdef SW_DEBUG
1218  std::cerr << __FILE__ << " " << __LINE__ << " TIMEOUT STOP_WRITING\n";
1219  #endif
1220 
1221  // Now tell the writer to get going
1222  if (sem_post(&m_swSemaphore) < 0)
1223  {
1224  log<software_critical>({__FILE__, __LINE__, errno, 0, "Error posting to semaphore"});
1225  return;
1226  }
1227  restartWriting = false;
1228  break;
1229  default:
1230  break;
1231  }
1232 
1233  if (image.md[0].sem <= 0)
1234  {
1235  break; // Indicates that the server has cleaned up.
1236  }
1237 
1238  // Check for why we timed out
1239  if (errno == EINTR)
1240  {
1241  break; // This will indicate time to shutdown, loop will exit normally flags set.
1242  }
1243 
1244  // ETIMEDOUT just means we should wait more.
1245  // Otherwise, report an error.
1246  if (errno != ETIMEDOUT)
1247  {
1248  log<software_error>({__FILE__, __LINE__, errno, "sem_timedwait"});
1249  break;
1250  }
1251 
1252  // Check if the file has disappeared.
1253  int SM_fd;
1254  char SM_fname[200];
1255  ImageStreamIO_filename(SM_fname, sizeof(SM_fname), m_shmimName.c_str());
1256  SM_fd = open(SM_fname, O_RDWR);
1257  if (SM_fd == -1)
1258  {
1259  m_restart = true;
1260  }
1261  close(SM_fd);
1262 
1263  // Check if the inode changed
1264  struct stat buffer;
1265  int rv = stat(SM_fname, &buffer);
1266  if (rv != 0)
1267  {
1268  m_restart = true;
1269  }
1270 
1271  if (buffer.st_ino != inode)
1272  {
1273  #ifdef SW_DEBUG
1274  std::cerr << "Restarting due to inode . . . \n";
1275  #endif
1276  m_restart = true;
1277  }
1278  }
1279  }
1280 
1281  ///\todo might still be writing here, so must check
1282  // If semaphore times-out or errors, we first cleanup any writing that needs to be done
1284  {
1285  // Here, if there is at least 1 image, then write
1286  if ((m_currImage - m_nextChunkStart > 0))
1287  {
1290  m_currSaveStopFrameNo = last_cnt0;
1291 
1293 
1294  std::cerr << __FILE__ << " " << __LINE__ << " WRITING ON RESTART " << last_cnt0 << "\n";
1295  // Now tell the writer to get going
1296  if (sem_post(&m_swSemaphore) < 0)
1297  {
1298  log<software_critical>({__FILE__, __LINE__, errno, 0, "Error posting to semaphore"});
1299  return;
1300  }
1301  }
1302  else
1303  {
1305  }
1306 
1307 
1308  while(m_writing != NOT_WRITING)
1309  {
1310  std::cerr << __FILE__ << " " << __LINE__ << " WAITING TO FINISH WRITING " << last_cnt0 << "\n";
1311  sleep(1);
1312  }
1313  }
1314 
1315  if (m_rawImageCircBuff)
1316  {
1317  free(m_rawImageCircBuff);
1318  m_rawImageCircBuff = 0;
1319  }
1320 
1321  if (m_timingCircBuff)
1322  {
1323  free(m_timingCircBuff);
1324  m_timingCircBuff = 0;
1325  }
1326 
1327  if (opened)
1328  {
1329  if (m_semaphoreNumber >= 0)
1330  {
1331  ///\todo is this release necessary with closeIM?
1332  image.semReadPID[m_semaphoreNumber] = 0; // release semaphore
1333  }
1334  ImageStreamIO_closeIm(&image);
1335  opened = false;
1336  }
1337 
1338  } // outer loop, will exit if m_shutdown==true
1339 
1340  // One more check
1341  if (m_rawImageCircBuff)
1342  {
1343  free(m_rawImageCircBuff);
1344  m_rawImageCircBuff = 0;
1345  }
1346 
1347  if (m_timingCircBuff)
1348  {
1349  free(m_timingCircBuff);
1350  m_timingCircBuff = 0;
1351  }
1352 
1353  if (opened)
1354  {
1355  if (m_semaphoreNumber >= 0)
1356  {
1357  ///\todo is this release necessary with closeIM?
1358  image.semReadPID[m_semaphoreNumber] = 0; // release semaphore.
1359  }
1360 
1361  ImageStreamIO_closeIm(&image);
1362  }
1363 }
1364 
1366 {
1367  s->swThreadExec();
1368 }
1369 
1371 {
1372  m_swThreadID = syscall(SYS_gettid);
1373 
1374  // Wait fpr the thread starter to finish initializing this thread.
1375  while (m_swThreadInit == true && m_shutdown == 0)
1376  {
1377  sleep(1);
1378  }
1379 
1380  while (!m_shutdown)
1381  {
1382  while (!shutdown() && (!(state() == stateCodes::READY || state() == stateCodes::OPERATING)))
1383  {
1384  if (m_fname)
1385  {
1386  free(m_fname);
1387  m_fname = nullptr;
1388  }
1389  sleep(1);
1390  }
1391 
1392  if (shutdown())
1393  {
1394  break;
1395  }
1396 
1397  // This will happen after a reconnection, and could update m_shmimName, etc.
1398  if (m_fname == nullptr)
1399  {
1400  m_fnameBase = m_rawimageDir + "/" + m_outName + "_";
1401 
1402  m_fnameSz = m_fnameBase.size() + sizeof("YYYYMMDDHHMMSSNNNNNNNNN.xrif"); // the sizeof includes the \0
1403  m_fname = (char *)malloc(m_fnameSz);
1404 
1405  snprintf(m_fname, m_fnameSz, "%sYYYYMMDDHHMMSSNNNNNNNNN.xrif", m_fnameBase.c_str());
1406  }
1407 
1408  // at this point fname is not null.
1409 
1410  timespec ts;
1411 
1412  if (clock_gettime(CLOCK_REALTIME, &ts) < 0)
1413  {
1414  log<software_critical>({__FILE__, __LINE__, errno, 0, "clock_gettime"});
1415 
1416  free(m_fname);
1417  m_fname = nullptr;
1418 
1419  return; // will trigger a shutdown
1420  }
1421 
1422  mx::sys::timespecAddNsec(ts, m_semWaitNSec);
1423 
1424  if (sem_timedwait(&m_swSemaphore, &ts) == 0)
1425  {
1426  if (doEncode() < 0)
1427  {
1428  log<software_critical>({__FILE__, __LINE__, "error encoding data"});
1429  return;
1430  }
1431  // Otherwise, success, and we just go on.
1432  }
1433  else
1434  {
1435  // Check for why we timed out
1436  if (errno == EINTR)
1437  {
1438  continue; // This will probably indicate time to shutdown, loop will exit normally if flags set.
1439  }
1440 
1441  // ETIMEDOUT just means we should wait more.
1442  // Otherwise, report an error.
1443  if (errno != ETIMEDOUT)
1444  {
1445  log<software_error>({__FILE__, __LINE__, errno, "sem_timedwait"});
1446  break;
1447  }
1448  }
1449  } // outer loop, will exit if m_shutdown==true
1450 
1451  if (m_fname)
1452  {
1453  free(m_fname);
1454  m_fname = nullptr;
1455  }
1456 }
1457 
1459 {
1460  if (m_writing == NOT_WRITING)
1461  {
1462  return 0;
1463  }
1464 
1465  recordSavingState(true);
1466 
1467  // Record these to prevent a change in other thread
1468  uint64_t saveStart = m_currSaveStart;
1469  uint64_t saveStopFrameNo = m_currSaveStopFrameNo;
1470  size_t nFrames = m_currSaveStop - saveStart;
1471  size_t nBytes = m_width * m_height * m_typeSize;
1472 
1473  #ifdef SW_DEBUG
1474  std::cerr << "nFrames: " << nFrames << "\n";
1475  #endif
1476 
1477  // Configure xrif and copy image data -- this does no allocations
1478  int rv = xrif_set_size(m_xrif, m_width, m_height, 1, nFrames, m_dataType);
1479  if (rv != XRIF_NOERROR)
1480  {
1481  // This is a big problem. Report it as "ALERT" and go on.
1482  log<software_alert>({__FILE__, __LINE__, 0, rv, "xrif set size error. DATA POSSIBLY LOST"});
1483  }
1484 
1485  rv = xrif_set_lz4_acceleration(m_xrif, m_lz4accel);
1486  if (rv != XRIF_NOERROR)
1487  {
1488  // This may just be out of range, it's only an error.
1489  log<software_error>({__FILE__, __LINE__, 0, rv, "xrif set LZ4 acceleration error."});
1490  }
1491 
1492  memcpy(m_xrif->raw_buffer, m_rawImageCircBuff + saveStart * nBytes, nFrames * nBytes);
1493 
1494  // Configure xrif and copy timing data -- no allocations
1495  rv = xrif_set_size(m_xrif_timing, 5, 1, 1, nFrames, XRIF_TYPECODE_UINT64);
1496  if (rv != XRIF_NOERROR)
1497  {
1498  // This is a big problem. Report it as "ALERT" and go on.
1499  log<software_alert>({__FILE__, __LINE__, 0, rv, "xrif set size error. DATA POSSIBLY LOST."});
1500  }
1501 
1502  rv = xrif_set_lz4_acceleration(m_xrif_timing, m_lz4accel);
1503  if (rv != XRIF_NOERROR)
1504  {
1505  // This may just be out of range, it's only an error.
1506  log<software_error>({__FILE__, __LINE__, 0, rv, "xrif set LZ4 acceleration error."});
1507  }
1508 
1509  #ifdef SW_DEBUG
1510  for (size_t nF = 0; nF < nFrames; ++nF)
1511  {
1512  std::cerr << " " << (m_timingCircBuff + saveStart * 5 + nF * 5)[0] << "\n";
1513  }
1514  #endif
1515 
1516  memcpy(m_xrif_timing->raw_buffer, m_timingCircBuff + saveStart * 5, nFrames * 5 * sizeof(uint64_t));
1517 
1518  rv = xrif_encode(m_xrif);
1519  if (rv != XRIF_NOERROR)
1520  {
1521  // This is a big problem. Report it as "ALERT" and go on.
1522  log<software_alert>({__FILE__, __LINE__, 0, rv, "xrif encode error. DATA POSSIBLY LOST."});
1523  }
1524 
1525  rv = xrif_write_header(m_xrif_header, m_xrif);
1526  if (rv != XRIF_NOERROR)
1527  {
1528  // This is a big problem. Report it as "ALERT" and go on.
1529  log<software_alert>({__FILE__, __LINE__, 0, rv, "xrif write header error. DATA POSSIBLY LOST."});
1530  }
1531 
1532  rv = xrif_encode(m_xrif_timing);
1533  if (rv != XRIF_NOERROR)
1534  {
1535  // This is a big problem. Report it as "ALERT" and go on.
1536  log<software_alert>({__FILE__, __LINE__, 0, rv, "xrif encode error. DATA POSSIBLY LOST."});
1537  }
1538 
1539  rv = xrif_write_header(m_xrif_timing_header, m_xrif_timing);
1540  if (rv != XRIF_NOERROR)
1541  {
1542  // This is a big problem. Report it as "ALERT" and go on.
1543  log<software_alert>({__FILE__, __LINE__, 0, rv, "xrif write header error. DATA POSSIBLY LOST"});
1544  }
1545 
1546  // Now break down the acq time of the first image in the buffer for use in file name
1547  tm uttime; // The broken down time.
1548  timespec *fts = (timespec *)(m_timingCircBuff + saveStart * 5 + 1);
1549 
1550  if (gmtime_r(&fts->tv_sec, &uttime) == 0)
1551  {
1552  // Yell at operator but keep going
1553  log<software_alert>({__FILE__, __LINE__, errno, 0, "gmtime_r error. possible loss of timing information."});
1554  }
1555 
1556  // Available size = m_fnameSz-m_fnameBase.size(), rather than assuming sizeof("YYYYMMDDHHMMSSNNNNNNNNN"), in case we screwed up somewhere.
1557  rv = snprintf(m_fname + m_fnameBase.size(), m_fnameSz - m_fnameBase.size(), "%04i%02i%02i%02i%02i%02i%09i", uttime.tm_year + 1900,
1558  uttime.tm_mon + 1, uttime.tm_mday, uttime.tm_hour, uttime.tm_min, uttime.tm_sec, static_cast<int>(fts->tv_nsec));
1559 
1560  if (rv != sizeof("YYYYMMDDHHMMSSNNNNNNNNN") - 1)
1561  {
1562  // Something is very wrong. Keep going to try to get it on disk.
1563  log<software_alert>({__FILE__, __LINE__, errno, rv, "did not write enough chars to timestamp"});
1564  }
1565 
1566  // Cover up the \0 inserted by snprintf
1567  (m_fname + m_fnameBase.size())[23] = '.';
1568 
1569  FILE *fp_xrif = fopen(m_fname, "wb");
1570  if (fp_xrif == NULL)
1571  {
1572  // This is it. If we can't write data to disk need to fix.
1573  log<software_alert>({__FILE__, __LINE__, errno, 0, "failed to open file for writing"});
1574 
1575  free(m_fname);
1576  m_fname = nullptr;
1577 
1578  return -1; // will trigger a shutdown
1579  }
1580 
1581  size_t bw = fwrite(m_xrif_header, sizeof(uint8_t), XRIF_HEADER_SIZE, fp_xrif);
1582 
1583  if (bw != XRIF_HEADER_SIZE)
1584  {
1585  log<software_alert>({__FILE__, __LINE__, errno, 0, "failure writing header to file. DATA LOSS LIKELY. bytes = " + std::to_string(bw)});
1586  // We go on . . .
1587  }
1588 
1589  bw = fwrite(m_xrif->raw_buffer, sizeof(uint8_t), m_xrif->compressed_size, fp_xrif);
1590 
1591  if (bw != m_xrif->compressed_size)
1592  {
1593  log<software_alert>({__FILE__, __LINE__, errno, 0, "failure writing data to file. DATA LOSS LIKELY. bytes = " + std::to_string(bw)});
1594  }
1595 
1596  bw = fwrite(m_xrif_timing_header, sizeof(uint8_t), XRIF_HEADER_SIZE, fp_xrif);
1597 
1598  if (bw != XRIF_HEADER_SIZE)
1599  {
1600  log<software_alert>({__FILE__, __LINE__, errno, 0, "failure writing timing header to file. DATA LOSS LIKELY. bytes = " + std::to_string(bw)});
1601  }
1602 
1603  bw = fwrite(m_xrif_timing->raw_buffer, sizeof(uint8_t), m_xrif_timing->compressed_size, fp_xrif);
1604 
1605  if (bw != m_xrif_timing->compressed_size)
1606  {
1607  log<software_alert>({__FILE__, __LINE__, errno, 0, "failure writing timing data to file. DATA LOSS LIKELY. bytes = " + std::to_string(bw)});
1608  }
1609 
1610  fclose(fp_xrif);
1611 
1612  recordSavingStats(true);
1613 
1614  if (m_writing == STOP_WRITING)
1615  {
1617  log<saving_stop>({0, saveStopFrameNo});
1618  }
1619 
1620  recordSavingState(true);
1621 
1622  return 0;
1623 
1624 } // doEncode
1625 
1626 INDI_NEWCALLBACK_DEFN(streamWriter, m_indiP_writing)
1627 (const pcf::IndiProperty &ipRecv)
1628 {
1629  INDI_VALIDATE_CALLBACK_PROPS(m_indiP_writing, ipRecv);
1630 
1631  if (!ipRecv.find("toggle"))
1632  {
1633  return 0;
1634  }
1635 
1636  if (ipRecv["toggle"].getSwitchState() == pcf::IndiElement::Off && (m_writing == WRITING || m_writing == START_WRITING))
1637  {
1638  m_writing = STOP_WRITING;
1639  }
1640 
1641  if (ipRecv["toggle"].getSwitchState() == pcf::IndiElement::On && m_writing == NOT_WRITING)
1642  {
1643  m_writing = START_WRITING;
1644  }
1645 
1646  return 0;
1647 }
1648 
1650 {
1651  // Only update this if not changing
1652  if (m_writing == NOT_WRITING || m_writing == WRITING)
1653  {
1654  if (m_xrif && m_writing == WRITING)
1655  {
1656  indi::updateSwitchIfChanged(m_indiP_writing, "toggle", pcf::IndiElement::On, m_indiDriver, INDI_OK);
1657  indi::updateIfChanged(m_indiP_xrifStats, "ratio", m_xrif->compression_ratio, m_indiDriver, INDI_BUSY);
1658  indi::updateIfChanged(m_indiP_xrifStats, "encodeMBsec", m_xrif->encode_rate / 1048576.0, m_indiDriver, INDI_BUSY);
1660  indi::updateIfChanged(m_indiP_xrifStats, "differenceMBsec", m_xrif->difference_rate / 1048576.0, m_indiDriver, INDI_BUSY);
1661  indi::updateIfChanged(m_indiP_xrifStats, "differenceFPS", m_xrif->difference_rate / (m_width * m_height * m_typeSize), m_indiDriver, INDI_BUSY);
1662  indi::updateIfChanged(m_indiP_xrifStats, "reorderMBsec", m_xrif->reorder_rate / 1048576.0, m_indiDriver, INDI_BUSY);
1664  indi::updateIfChanged(m_indiP_xrifStats, "compressMBsec", m_xrif->compress_rate / 1048576.0, m_indiDriver, INDI_BUSY);
1666  }
1667  else
1668  {
1669  indi::updateSwitchIfChanged(m_indiP_writing, "toggle", pcf::IndiElement::Off, m_indiDriver, INDI_OK);
1679  }
1680  }
1681 }
1682 
1684 {
1686 }
1687 
1689 {
1690  return recordSavingState(true);
1691 }
1692 
1694 {
1695  static int16_t lastState = -1;
1696  static uint64_t currSaveStart = -1;
1697 
1698  int16_t state;
1699  if (m_writing == WRITING || m_writing == START_WRITING || m_writing == STOP_WRITING) // Changed from just writing 5/2024
1700  state = 1;
1701  else
1702  state = 0;
1703 
1704  if (state != lastState || m_currSaveStart != currSaveStart || force)
1705  {
1706  telem<telem_saving_state>({state, m_currSaveStart});
1707 
1708  lastState = state;
1709  currSaveStart = m_currSaveStart;
1710  }
1711 
1712  return 0;
1713 }
1714 
1716 {
1717  static uint32_t last_rawSize = -1;
1718  static uint32_t last_compressedSize = -1;
1719  static float last_encodeRate = -1;
1720  static float last_differenceRate = -1;
1721  static float last_reorderRate = -1;
1722  static float last_compressRate = -1;
1723 
1724  if (m_xrif->raw_size != last_rawSize || m_xrif->compressed_size != last_compressedSize || m_xrif->encode_rate != last_encodeRate || m_xrif->difference_rate != last_differenceRate ||
1725  m_xrif->reorder_rate != last_reorderRate || m_xrif->compress_rate != last_compressRate || force)
1726  {
1727  telem<telem_saving>({(uint32_t)m_xrif->raw_size, (uint32_t)m_xrif->compressed_size, (float)m_xrif->encode_rate, (float)m_xrif->difference_rate, (float)m_xrif->reorder_rate, (float)m_xrif->compress_rate});
1728 
1729  last_rawSize = m_xrif->raw_size;
1730  last_compressedSize = m_xrif->compressed_size;
1731  last_encodeRate = m_xrif->encode_rate;
1732  last_differenceRate = m_xrif->difference_rate;
1733  last_reorderRate = m_xrif->reorder_rate;
1734  last_compressRate = m_xrif->compress_rate;
1735  }
1736 
1737  return 0;
1738 }
1739 
1740 } // namespace app
1741 } // namespace MagAOX
1742 
1743 #endif
The base-class for MagAO-X applications.
Definition: MagAOXApp.hpp:73
stateCodes::stateCodeT state()
Get the current state code.
Definition: MagAOXApp.hpp:2297
int registerIndiPropertyNew(pcf::IndiProperty &prop, int(*)(void *, const pcf::IndiProperty &))
Register an INDI property which is exposed for others to request a New Property for.
int createStandardIndiToggleSw(pcf::IndiProperty &prop, const std::string &name, const std::string &label="", const std::string &group="")
Create a standard R/W INDI switch with a single toggle element.
Definition: MagAOXApp.hpp:2543
int m_shutdown
Flag to signal it's time to shutdown. When not 0, the main loop exits.
Definition: MagAOXApp.hpp:100
int shutdown()
Get the value of the shutdown flag.
Definition: MagAOXApp.hpp:1182
indiDriver< MagAOXApp > * m_indiDriver
The INDI driver wrapper. Constructed and initialized by execute, which starts and stops communication...
Definition: MagAOXApp.hpp:542
static int log(const typename logT::messageT &msg, logPrioT level=logPrio::LOG_DEFAULT)
Make a log entry.
Definition: MagAOXApp.hpp:1804
int threadStart(std::thread &thrd, bool &thrdInit, pid_t &tpid, pcf::IndiProperty &thProp, int thrdPrio, const std::string &cpuset, const std::string &thrdName, thisPtr *thrdThis, Function &&thrdStart)
Start a thread, using this class's privileges to set priority, etc.
Definition: MagAOXApp.hpp:2157
std::string MagAOXPath
The base path of the MagAO-X system.
Definition: MagAOXApp.hpp:81
pcf::IndiProperty m_indiP_xrifStats
pcf::IndiProperty m_indiP_writing
double m_currImageTime
The write-time of the current image.
std::thread m_fgThread
A separate thread for the actual framegrabbings.
int m_fgThreadPrio
Priority of the framegrabber thread, should normally be > 00.
pid_t m_fgThreadID
F.g. thread PID.
size_t m_circBuffLength
The length of the circular buffer, in frames.
int initialize_xrif()
Initialize the xrif system.
int recordTelem(const telem_saving_state *)
uint8_t m_dataType
The ImageStreamIO type code.
unsigned m_semWaitSec
The time in whole sec to wait on the semaphore, to which m_semWaitNSec is added. Default is 0 nsec.
bool m_fgThreadInit
Synchronizer to ensure f.g. thread initializes before doing dangerous things.
int m_typeSize
The pixel byte depth.
xrif_t m_xrif_timing
The xrif compression handle for image data.
uint64_t m_currSaveStart
The circular buffer position at which to start saving.
virtual int appStartup()
Startup functions.
size_t m_writeChunkLength
The number of frames to write at a time.
size_t m_height
The height of the image.
uint64_t m_currSaveStopFrameNo
The frame number of the image at which saving stopped (for logging)
double m_currChunkStartTime
The write-time of the first image in the chunk.
int allocate_circbufs()
Worker function to allocate the circular buffers.
void handlerSigSegv(int signum, siginfo_t *siginf, void *ucont)
Handles SIGSEGV and SIGBUS. Sets m_restart to true.
int m_swThreadPrio
Priority of the stream writer thread, should normally be > 0, and <= m_fgThreadPrio.
int recordSavingStats(bool force=false)
pcf::IndiProperty m_swThreadProp
The property to hold the s.w. thread details.
std::thread m_swThread
A separate thread for the actual writing.
std::string m_fgCpuset
The cpuset for the framegrabber thread. Ignored if empty (the default).
virtual int appLogic()
Implementation of the FSM for the Siglent SDG.
static void fgThreadStart(streamWriter *s)
Thread starter, called by fgThreadStart on thread construction. Calls fgThreadExec.
std::string m_outName
The name to use for outputting files, Default is m_shmimName.
INDI_NEWCALLBACK_DECL(streamWriter, m_indiP_writing)
char * m_xrif_timing_header
Storage for the xrif image data file header.
~streamWriter() noexcept
Destructor.
unsigned m_semWaitNSec
The time in nsec to wait on the semaphore, added to m_semWaitSec. Max is 999999999....
int allocate_xrif()
Worker function to configure and allocate the xrif handles.
int setSigSegvHandler()
Sets the handler for SIGSEGV and SIGBUS.
std::string m_swCpuset
The cpuset for the framegrabber thread. Ignored if empty (the default).
xrif_t m_xrif
The xrif compression handle for image data.
streamWriter()
Default c'tor.
virtual void setupConfig()
Setup the configuration system (called by MagAOXApp::setup())
int m_semaphoreNumber
The image structure semaphore index.
std::string m_rawimageDir
The path where files will be saved.
static void _handlerSigSegv(int signum, siginfo_t *siginf, void *ucont)
The handler called when SIGSEGV or SIGBUS is received, which will be due to ImageStreamIO server rese...
int recordSavingState(bool force=false)
int m_writing
Controls whether or not images are being written, and sequences start and stop of writing.
uint64_t m_currChunkStart
The circular buffer starting position of the current to-be-written chunk.
pid_t m_swThreadID
S.w. thread pid.
virtual void loadConfig()
load the configuration system results (called by MagAOXApp::setup())
pcf::IndiProperty m_fgThreadProp
The property to hold the f.g. thread details.
void swThreadExec()
Execute the stream writer main loop.
uint64_t m_nextChunkStart
The circular buffer starting position of the next to-be-written chunk.
static streamWriter * m_selfWriter
Static pointer to this (set in constructor). Used for getting out of the static SIGSEGV handler.
char * m_xrif_header
Storage for the xrif image data file header.
double m_maxChunkTime
The maximum time before writing regardless of number of frames.
int doEncode()
Function called when semaphore is raised to do the encode and write.
static void swThreadStart(streamWriter *s)
Thread starter, called by swThreadStart on thread construction. Calls swThreadExec.
uint64_t m_currSaveStop
The circular buffer position at which to stop saving.
void fgThreadExec()
Execute the frame grabber main loop.
std::string m_shmimName
The name of the shared memory buffer.
dev::telemeter< streamWriter > telemeterT
size_t m_width
The width of the image.
virtual int appShutdown()
Do any needed shutdown tasks. Currently nothing in this app.
sem_t m_swSemaphore
Semaphore used to synchronize the fg thread and the sw thread.
bool m_swThreadInit
Synchronizer to ensure s.w. thread initializes before doing dangerous things.
#define MAGAOX_rawimageRelPath
The relative path to the raw images directory.
Definition: paths.hpp:92
#define REG_INDI_NEWPROP_NOCB(prop, propName, type)
Register a NEW INDI property with the class, with no callback.
Definition: indiMacros.hpp:248
#define INDI_NEWCALLBACK(prop)
Get the name of the static callback wrapper for a new property.
Definition: indiMacros.hpp:208
@ OPERATING
The device is operating, other than homing.
Definition: stateCodes.hpp:55
@ READY
The device is ready for operation, but is not operating.
Definition: stateCodes.hpp:56
#define INDI_IDLE
Definition: indiUtils.hpp:28
#define INDI_BUSY
Definition: indiUtils.hpp:30
#define INDI_OK
Definition: indiUtils.hpp:29
std::ostream & cerr()
void updateIfChanged(pcf::IndiProperty &p, const std::string &el, const T &newVal, indiDriverT *indiDriver, pcf::IndiProperty::PropertyStateType newState=pcf::IndiProperty::Ok)
Update the value of the INDI element, but only if it has changed.
Definition: indiUtils.hpp:95
void updateSwitchIfChanged(pcf::IndiProperty &p, const std::string &el, const pcf::IndiElement::SwitchStateType &newVal, indiDriverT *indiDriver, pcf::IndiProperty::PropertyStateType newState=pcf::IndiProperty::Ok)
Update the value of the INDI element, but only if it has changed.
Definition: indiUtils.hpp:212
INDI_VALIDATE_CALLBACK_PROPS(function, ipRecv)
const pcf::IndiProperty & ipRecv
Definition: MagAOXApp.hpp:3434
INDI_NEWCALLBACK_DEFN(acesxeCtrl, m_indiP_windspeed)(const pcf
Definition: acesxeCtrl.hpp:687
Definition: dm.hpp:24
constexpr static logPrioT LOG_DEBUG
Used for debugging.
Definition: logPriority.hpp:52
constexpr static logPrioT LOG_CRITICAL
The process can not continue and will shut down (fatal)
Definition: logPriority.hpp:37
constexpr static logPrioT LOG_WARNING
A condition has occurred which may become an error, but the process continues.
Definition: logPriority.hpp:43
constexpr static logPrioT LOG_NOTICE
A normal but significant condition.
Definition: logPriority.hpp:46
#define XWC_SEM_WAIT_TS_RETVOID(ts, sec, nsec)
Add the wait time to a timespec for a sem_timedwait call, with no value returned on error.
Definition: semUtils.hpp:20
#define NOT_WRITING
#define STOP_WRITING
#define WRITING
#define START_WRITING
A device base class which saves telemetry.
Definition: telemeter.hpp:69
int appShutdown()
Perform telemeter application shutdown.
Definition: telemeter.hpp:274
int loadConfig(appConfigurator &config)
Load the device section from an application configurator.
Definition: telemeter.hpp:223
int appLogic()
Perform telemeter application logic.
Definition: telemeter.hpp:268
int setupConfig(appConfigurator &config)
Setup an application configurator for the device section.
Definition: telemeter.hpp:211
int appStartup()
Starts the telemetry log thread.
Definition: telemeter.hpp:241
int checkRecordTimes(const telT &tel, telTs... tels)
Check the time of the last record for each telemetry type and make an entry if needed.
Definition: telemeter.hpp:281
Software CRITICAL log entry.
Software ERR log entry.