API
 
Loading...
Searching...
No Matches
streamWriter.hpp
Go to the documentation of this file.
1/** \file streamWriter.hpp
2 * \brief The MagAO-X Image Stream Writer
3 *
4 * \author Jared R. Males (jaredmales@gmail.com)
5 *
6 * \ingroup streamWriter_files
7 */
8
9#ifndef streamWriter_hpp
10#define streamWriter_hpp
11
12#include <ImageStreamIO/ImageStruct.h>
13#include <ImageStreamIO/ImageStreamIO.h>
14
15#include <xrif/xrif.h>
16
17#include <mx/sys/timeUtils.hpp>
18
19#include "../../libMagAOX/libMagAOX.hpp" //Note this is included on command line to trigger pch
20#include "../../magaox_git_version.h"
21
22#define NOT_WRITING ( 0 )
23#define START_WRITING ( 1 )
24#define WRITING ( 2 )
25#define STOP_WRITING ( 3 )
26
27//#define SW_DEBUG
28
29namespace MagAOX
30{
31namespace app
32{
33
34/** \defgroup streamWriter ImageStreamIO Stream Writing
35 * \brief Writes the contents of an ImageStreamIO image stream to disk.
36 *
37 * <a href="../handbook/operating/software/apps/streamWriter.html">Application Documentation</a>
38 *
39 * \ingroup apps
40 *
41 */
42
43/** \defgroup streamWriter_files ImageStreamIO Stream Writing
44 * \ingroup streamWriter
45 */
46
47/** MagAO-X application to control writing ImageStreamIO streams to disk.
48 *
49 * \ingroup streamWriter
50 *
51 */
52class streamWriter : public MagAOXApp<>, public dev::telemeter<streamWriter>
53{
55
56 friend class dev::telemeter<streamWriter>;
57
58 // Give the test harness access.
59 friend class streamWriter_test;
60
61 protected:
62 /** \name configurable parameters
63 *@{
64 */
65
66 std::string m_rawimageDir; ///< The path where files will be saved.
67
68 size_t m_maxCircBuffLength{ 1024 }; ///< The maximum length of the circular buffer, in frames
69
70 double m_maxCircBuffSize{ 2048 }; ///< The maximum size of the circular bufffer in MB.
71
72 size_t m_maxWriteChunkLength{ 512 }; /**< The maximum number of frames to write at a time. Must
73 be an integer factor of m_maxCircBuffLength.*/
74
75 double m_maxChunkTime{ 10 }; ///< The maximum time before writing regardless of number of frames.
76
77 std::string m_shmimName; ///< The name of the shared memory buffer.
78
79 std::string m_outName; ///< The name to use for outputting files, Default is m_shmimName.
80
81 int m_semaphoreNumber{ 7 }; ///< The image structure semaphore index.
82
83 unsigned m_semWaitSec{
84 0 }; ///< The time in whole sec to wait on the semaphore, to which m_semWaitNSec is added. Default is 0 nsec.
85
86 unsigned m_semWaitNSec{ 500000000 }; ///< The time in nsec to wait on the semaphore, added to m_semWaitSec. Max is
87 ///< 999999999. Default is 5e8 nsec.
88
89 int m_lz4accel{ 1 };
90
91 bool m_compress{ true };
92
93 ///@}
94
95 size_t m_circBuffLength{ 1024 }; ///< The length of the circular buffer, in frames
96 double m_circBuffSize{ 2048.0 }; ///< The size of the circular buffer, in MB
97 size_t m_writeChunkLength{ 512 }; ///< The number of frames to write at a time
98
99 size_t m_width{ 0 }; ///< The width of the image
100 size_t m_height{ 0 }; ///< The height of the image
101 uint8_t m_dataType{ 0 }; ///< The ImageStreamIO type code.
102 int m_typeSize{ 0 }; ///< The pixel byte depth
103
104 char *m_rawImageCircBuff{ nullptr };
106
107 size_t m_currImage{ 0 };
108
109 double m_currImageTime{ 0 }; ///< The write-time of the current image
110
111 double m_currChunkStartTime{ 0 }; ///< The write-time of the first image in the chunk
112
113 // Writer book-keeping:
115 NOT_WRITING }; ///< Controls whether or not images are being written, and sequences start and stop of writing.
116
117 uint64_t m_currChunkStart{ 0 }; ///< The circular buffer starting position of the current to-be-written chunk.
118 uint64_t m_nextChunkStart{ 0 }; ///< The circular buffer starting position of the next to-be-written chunk.
119
120 uint64_t m_currSaveStart{ 0 }; ///< The circular buffer position at which to start saving.
121 uint64_t m_currSaveStop{ 0 }; ///< The circular buffer position at which to stop saving.
122
123 uint64_t m_currSaveStopFrameNo{ 0 }; ///< The frame number of the image at which saving stopped (for logging)
124
125 /// The xrif compression handle for image data
126 xrif_t m_xrif{ nullptr };
127
128 /// Storage for the xrif image data file header
129 char *m_xrif_header{ nullptr };
130
131 /// The xrif compression handle for image data
133
134 /// Storage for the xrif image data file header
135 char *m_xrif_timing_header{ nullptr };
136
137 public:
138 /// Default c'tor
139 streamWriter();
140
141 /// Destructor
143
144 /// Setup the configuration system (called by MagAOXApp::setup())
145 virtual void setupConfig();
146
147 /// load the configuration system results (called by MagAOXApp::setup())
148 virtual void loadConfig();
149
150 /// Startup functions
151 /** Sets up the INDI vars.
152 *
153 */
154 virtual int appStartup();
155
156 /// Implementation of the FSM for the Siglent SDG
157 virtual int appLogic();
158
159 /// Do any needed shutdown tasks. Currently nothing in this app.
160 virtual int appShutdown();
161
162 protected:
163 /** \name SIGSEGV & SIGBUS signal handling
164 * These signals occur as a result of a ImageStreamIO source server resetting (e.g. changing frame sizes).
165 * When they occur a restart of the framegrabber and framewriter thread main loops is triggered.
166 *
167 * @{
168 */
169 bool m_restart{ false };
170
171 static streamWriter *m_selfWriter; ///< Static pointer to this (set in constructor). Used for getting out of the
172 ///< static SIGSEGV handler.
173
174 /// Initialize the xrif system.
175 /** Allocates the handles and headers pointers.
176 *
177 * \returns 0 on success.
178 * \returns -1 on error.
179 */
180 int initialize_xrif();
181
182 /// Sets the handler for SIGSEGV and SIGBUS
183 /** These are caused by ImageStreamIO server resets.
184 */
185 int setSigSegvHandler();
186
187 /// The handler called when SIGSEGV or SIGBUS is received, which will be due to ImageStreamIO server resets. Just a
188 /// wrapper for handlerSigSegv.
189 static void _handlerSigSegv( int signum, siginfo_t *siginf, void *ucont );
190
191 /// Handles SIGSEGV and SIGBUS. Sets m_restart to true.
192 void handlerSigSegv( int signum, siginfo_t *siginf, void *ucont );
193 ///@}
194
195 /** \name Framegrabber Thread
196 * This thread monitors the ImageStreamIO buffer and copies its images to the circular buffer.
197 *
198 * @{
199 */
200 int m_fgThreadPrio{ 1 }; ///< Priority of the framegrabber thread, should normally be > 00.
201
202 std::string m_fgCpuset; ///< The cpuset for the framegrabber thread. Ignored if empty (the default).
203
204 std::thread m_fgThread; ///< A separate thread for the actual framegrabbings
205
206 bool m_fgThreadInit{ true }; ///< Synchronizer to ensure f.g. thread initializes before doing dangerous things.
207
208 pid_t m_fgThreadID{ 0 }; ///< F.g. thread PID.
209
210 pcf::IndiProperty m_fgThreadProp; ///< The property to hold the f.g. thread details.
211
212 public:
213 static void getCircBuffLengths( size_t &circBuffLength,
214 double &circBuffSize,
215 size_t &writeChunkLength,
216 size_t maxCircBuffLength,
217 double maxCircBuffSize,
218 size_t maxWriteChunkLength,
219 uint32_t width,
220 uint32_t height,
221 size_t typeSize );
222
223 protected:
224 /// Worker function to allocate the circular buffers.
225 /** This takes place in the fg thread after connecting to the stream.
226 *
227 * \returns 0 on sucess.
228 * \returns -1 on error.
229 */
230 int allocate_circbufs();
231
232 /// Worker function to configure and allocate the xrif handles.
233 /** This takes place in the fg thread after connecting to the stream.
234 *
235 * \returns 0 on sucess.
236 * \returns -1 on error.
237 */
238 int allocate_xrif();
239
240 /// Thread starter, called by fgThreadStart on thread construction. Calls fgThreadExec.
241 static void fgThreadStart( streamWriter *s /**< [in] a pointer to an streamWriter instance (normally this) */ );
242
243 /// Execute the frame grabber main loop.
244 void fgThreadExec();
245
246 ///@}
247
248 /** \name Stream Writer Thread
249 * This thread writes chunks of the circular buffer to disk.
250 *
251 * @{
252 */
253 int m_swThreadPrio{ 1 }; ///< Priority of the stream writer thread, should normally be > 0, and <= m_fgThreadPrio.
254
255 std::string m_swCpuset; ///< The cpuset for the framegrabber thread. Ignored if empty (the default).
256
257 sem_t m_swSemaphore; ///< Semaphore used to synchronize the fg thread and the sw thread.
258
259 std::thread m_swThread; ///< A separate thread for the actual writing
260
261 bool m_swThreadInit{ true }; ///< Synchronizer to ensure s.w. thread initializes before doing dangerous things.
262
263 pid_t m_swThreadID{ 0 }; ///< S.w. thread pid.
264
265 pcf::IndiProperty m_swThreadProp; ///< The property to hold the s.w. thread details.
266
267 size_t m_fnameSz{ 0 };
268
269 char *m_fname{ nullptr };
270
271 std::string m_fnameBase;
272
273 /// Thread starter, called by swThreadStart on thread construction. Calls swThreadExec.
274 static void swThreadStart( streamWriter *s /**< [in] a pointer to an streamWriter instance (normally this) */ );
275
276 /// Execute the stream writer main loop.
277 void swThreadExec();
278
279 /// Function called when semaphore is raised to do the encode and write.
280 int doEncode();
281 ///@}
282
283 // INDI:
284 protected:
285 // declare our properties
286 pcf::IndiProperty m_indiP_writing;
287
288 pcf::IndiProperty m_indiP_xrifStats;
289
290 public:
292
293 void updateINDI();
294
295 /** \name Telemeter Interface
296 *
297 * @{
298 */
299 int checkRecordTimes();
300
301 int recordTelem( const telem_saving_state * );
302
303 int recordSavingState( bool force = false );
304 int recordSavingStats( bool force = false );
305
306 ///@}
307};
308
309// Set self pointer to null so app starts up uninitialized.
311
312streamWriter::streamWriter() : MagAOXApp( MAGAOX_CURRENT_SHA1, MAGAOX_REPO_MODIFIED )
313{
314 m_powerMgtEnabled = false;
315
316 m_selfWriter = this;
317
318 return;
319}
320
322{
323 if( m_xrif )
325
326 if( m_xrif_header )
328
329 if( m_xrif_timing )
331
334
335 return;
336}
337
339{
340 config.add( "writer.savePath",
341 "",
342 "writer.savePath",
343 argType::Required,
344 "writer",
345 "savePath",
346 false,
347 "string",
348 "The absolute path where images are saved. Will use MagAO-X default if not set." );
349
350 config.add( "writer.maxCircBuffLength",
351 "",
352 "writer.maxCircBuffLength",
353 argType::Required,
354 "writer",
355 "maxCircBuffLength",
356 false,
357 "size_t",
358 "The maximum length in frames of the circular buffer. Should be an integer multiple of and larger than "
359 "maxWriteChunkLength." );
360
361 config.add( "writer.maxCircBuffSize",
362 "",
363 "writer.maxCircBuffSize",
364 argType::Required,
365 "writer",
366 "maxCircBuffSize",
367 false,
368 "double",
369 "The maximum size in MB of the circular buffer. Should be sized to hold at least 2 of the maximum "
370 "frame size." );
371
372 config.add(
373 "writer.maxWriteChunkLength",
374 "",
375 "writer.maxWriteChunkLength",
376 argType::Required,
377 "writer",
378 "maxWriteChunkLength",
379 false,
380 "size_t",
381 "The maximum length in frames of the chunks to write to disk. Should be smaller than maxCircBuffLength." );
382
383 config.add( "writer.maxChunkTime",
384 "",
385 "writer.maxChunkTime",
386 argType::Required,
387 "writer",
388 "maxChunkTime",
389 false,
390 "float",
391 "The max length in seconds of the chunks to write to disk. Default is 60 sec." );
392
393 config.add( "writer.threadPrio",
394 "",
395 "writer.threadPrio",
396 argType::Required,
397 "writer",
398 "threadPrio",
399 false,
400 "int",
401 "The real-time priority of the stream writer thread." );
402
403 config.add( "writer.cpuset",
404 "",
405 "writer.cpuset",
406 argType::Required,
407 "writer",
408 "cpuset",
409 false,
410 "int",
411 "The cpuset for the writer thread." );
412
413 config.add( "writer.compress",
414 "",
415 "writer.compress",
416 argType::Required,
417 "writer",
418 "compress",
419 false,
420 "bool",
421 "Flag to set whether compression is used. Default true." );
422
423 config.add( "writer.lz4accel",
424 "",
425 "writer.lz4accel",
426 argType::Required,
427 "writer",
428 "lz4accel",
429 false,
430 "int",
431 "The LZ4 acceleration parameter. Larger is faster, but lower compression." );
432
433 config.add( "writer.outName",
434 "",
435 "writer.outName",
436 argType::Required,
437 "writer",
438 "outName",
439 false,
440 "int",
441 "The name to use for output files. Default is the shmimName." );
442
443 config.add( "framegrabber.shmimName",
444 "",
445 "framegrabber.shmimName",
446 argType::Required,
447 "framegrabber",
448 "shmimName",
449 false,
450 "int",
451 "The name of the stream to monitor. From /tmp/shmimName.im.shm." );
452
453 config.add( "framegrabber.semaphoreNumber",
454 "",
455 "framegrabber.semaphoreNumber",
456 argType::Required,
457 "framegrabber",
458 "semaphoreNumber",
459 false,
460 "int",
461 "The semaphore to wait on. Default is 7." );
462
463 config.add( "framegrabber.semWait",
464 "",
465 "framegrabber.semWait",
466 argType::Required,
467 "framegrabber",
468 "semWait",
469 false,
470 "int",
471 "The time in nsec to wait on the semaphore. Max is 999999999. Default is 5e8 nsec." );
472
473 config.add( "framegrabber.threadPrio",
474 "",
475 "framegrabber.threadPrio",
476 argType::Required,
477 "framegrabber",
478 "threadPrio",
479 false,
480 "int",
481 "The real-time priority of the framegrabber thread." );
482
483 config.add( "framegrabber.cpuset",
484 "",
485 "framegrabber.cpuset",
486 argType::Required,
487 "framegrabber",
488 "cpuset",
489 false,
490 "string",
491 "The cpuset for the framegrabber thread." );
492
493 telemeterT::setupConfig( config );
494}
495
497{
498
499 config( m_maxCircBuffLength, "writer.maxCircBuffLength" );
500 config( m_maxCircBuffSize, "writer.maxCircBuffSize" );
501 config( m_maxWriteChunkLength, "writer.maxWriteChunkLength" );
502 config( m_maxChunkTime, "writer.maxChunkTime" );
503 config( m_swThreadPrio, "writer.threadPrio" );
504 config( m_swCpuset, "writer.cpuset" );
505 config( m_compress, "writer.compress" );
506 config( m_lz4accel, "writer.lz4accel" );
508 {
510 }
512 {
514 }
515
516 config( m_shmimName, "framegrabber.shmimName" );
517
519 config( m_outName, "writer.outName" );
520
521 config( m_semaphoreNumber, "framegrabber.semaphoreNumber" );
522 config( m_semWaitNSec, "framegrabber.semWait" );
523
524 config( m_fgThreadPrio, "framegrabber.threadPrio" );
525 config( m_fgCpuset, "framegrabber.cpuset" );
526
527 // Set some defaults
528 // Setup default saving path
529 std::string tmpstr = mx::sys::getEnv( MAGAOX_env_rawimage );
530 if( tmpstr == "" )
531 {
533 }
534 m_rawimageDir = basePath() + "/" + tmpstr + "/" + m_outName;
535
536 config( m_rawimageDir, "writer.savePath" );
537
538 if( telemeterT::loadConfig( config ) < 0 )
539 {
540 log<text_log>( "Error during telemeter config", logPrio::LOG_CRITICAL );
541 m_shutdown = true;
542 }
543}
544
546{
547 // Create save directory.
548 errno = 0;
549 if( mkdir( m_rawimageDir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH ) < 0 )
550 {
551 if( errno != EEXIST )
552 {
553 std::stringstream logss;
554 logss << "Failed to create image directory (" << m_rawimageDir << "). Errno says: " << strerror( errno );
556
557 return -1;
558 }
559 }
560
561 // set up the INDI properties
564
565 // Register the stats INDI property
566 REG_INDI_NEWPROP_NOCB( m_indiP_xrifStats, "xrif", pcf::IndiProperty::Number );
567 m_indiP_xrifStats.setLabel( "xrif compression performance" );
568
569 indi::addNumberElement<float>( m_indiP_xrifStats, "ratio", 0, 1.0, 0.0, "%0.2f", "Compression Ratio" );
570
571 indi::addNumberElement<float>( m_indiP_xrifStats,
572 "differenceMBsec",
573 0,
574 std::numeric_limits<float>::max(),
575 0.0,
576 "%0.2f",
577 "Differencing Rate [MB/sec]" );
578
579 indi::addNumberElement<float>( m_indiP_xrifStats,
580 "reorderMBsec",
581 0,
582 std::numeric_limits<float>::max(),
583 0.0,
584 "%0.2f",
585 "Reordering Rate [MB/sec]" );
586
587 indi::addNumberElement<float>( m_indiP_xrifStats,
588 "compressMBsec",
589 0,
590 std::numeric_limits<float>::max(),
591 0.0,
592 "%0.2f",
593 "Compression Rate [MB/sec]" );
594
595 indi::addNumberElement<float>( m_indiP_xrifStats,
596 "encodeMBsec",
597 0,
598 std::numeric_limits<float>::max(),
599 0.0,
600 "%0.2f",
601 "Total Encoding Rate [MB/sec]" );
602
603 indi::addNumberElement<float>( m_indiP_xrifStats,
604 "differenceFPS",
605 0,
606 std::numeric_limits<float>::max(),
607 0.0,
608 "%0.2f",
609 "Differencing Rate [f.p.s.]" );
610
611 indi::addNumberElement<float>( m_indiP_xrifStats,
612 "reorderFPS",
613 0,
614 std::numeric_limits<float>::max(),
615 0.0,
616 "%0.2f",
617 "Reordering Rate [f.p.s.]" );
618
619 indi::addNumberElement<float>( m_indiP_xrifStats,
620 "compressFPS",
621 0,
622 std::numeric_limits<float>::max(),
623 0.0,
624 "%0.2f",
625 "Compression Rate [f.p.s.]" );
626
627 indi::addNumberElement<float>( m_indiP_xrifStats,
628 "encodeFPS",
629 0,
630 std::numeric_limits<float>::max(),
631 0.0,
632 "%0.2f",
633 "Total Encoding Rate [f.p.s.]" );
634
635 // Now set up the framegrabber and writer threads.
636 // - need SIGSEGV and SIGBUS handling for ImageStreamIO restarts
637 // - initialize the semaphore
638 // - start the threads
639
640 if( setSigSegvHandler() < 0 )
641 return log<software_error, -1>( { __FILE__, __LINE__ } );
642
643 if( sem_init( &m_swSemaphore, 0, 0 ) < 0 )
644 return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "Initializing S.W. semaphore" } );
645
646 // Check if we have a safe writeChunkLengthh
648 {
649 return log<software_critical, -1>(
650 { __FILE__, __LINE__, "Write chunk length is not a divisor of circular buffer length." } );
651 }
652
653 if( initialize_xrif() < 0 )
655
662 "framegrabber",
663 this,
664 fgThreadStart ) < 0 )
665 {
666 return log<software_critical, -1>( { __FILE__, __LINE__ } );
667 }
668
675 "streamwriter",
676 this,
677 swThreadStart ) < 0 )
678 {
680 }
681
682 if( telemeterT::appStartup() < 0 )
683 {
684 return log<software_error, -1>( { __FILE__, __LINE__ } );
685 }
686
687 return 0;
688}
689
691{
692
693 // first do a join check to see if other threads have exited.
694 // these will throw if the threads are really gone
695 try
696 {
697 if( pthread_tryjoin_np( m_fgThread.native_handle(), 0 ) == 0 )
698 {
699 log<software_error>( { __FILE__, __LINE__, "framegrabber thread has exited" } );
700 return -1;
701 }
702 }
703 catch( ... )
704 {
705 log<software_error>( { __FILE__, __LINE__, "framegrabber thread has exited" } );
706 return -1;
707 }
708
709 try
710 {
711 if( pthread_tryjoin_np( m_swThread.native_handle(), 0 ) == 0 )
712 {
713 log<software_error>( { __FILE__, __LINE__, "stream thread has exited" } );
714 return -1;
715 }
716 }
717 catch( ... )
718 {
719 log<software_error>( { __FILE__, __LINE__, "streamwriter thread has exited" } );
720 return -1;
721 }
722
723 switch( m_writing )
724 {
725 case NOT_WRITING:
727 break;
728 default:
730 }
731
733 {
734 if( telemeterT::appLogic() < 0 )
735 {
737 return 0;
738 }
739 }
740
741 updateINDI();
742
743 return 0;
744}
745
747{
749 updateINDI();
750
751 try
752 {
753 if( m_fgThread.joinable() )
754 {
755 m_fgThread.join();
756 }
757 }
758 catch( ... )
759 {
760 }
761
762 try
763 {
764 if( m_swThread.joinable() )
765 {
766 m_swThread.join();
767 }
768 }
769 catch( ... )
770 {
771 }
772
773 if( m_xrif )
774 {
776 m_xrif = nullptr;
777 }
778
779 if( m_xrif_timing )
780 {
782 m_xrif_timing = nullptr;
783 }
784
786
787 return 0;
788}
789
791{
793 if( rv != XRIF_NOERROR )
794 {
795 return log<software_critical, -1>(
796 { __FILE__, __LINE__, 0, rv, "xrif handle allocation or initialization error." } );
797 }
798
799 if( m_compress )
800 {
802 if( rv != XRIF_NOERROR )
803 {
804 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
805 }
806 }
807 else
808 {
809 std::cerr << "not compressing . . . \n";
811 if( rv != XRIF_NOERROR )
812 {
813 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
814 }
815 }
816
817 errno = 0;
818 m_xrif_header = reinterpret_cast< char *>(malloc( XRIF_HEADER_SIZE * sizeof( char ) ));
819 if( m_xrif_header == NULL )
820 {
821 return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "xrif header allocation failed." } );
822 }
823
825 if( rv != XRIF_NOERROR )
826 {
827 return log<software_critical, -1>(
828 { __FILE__, __LINE__, 0, rv, "xrif handle allocation or initialization error." } );
829 }
830
831 // m_xrif_timing->reorder_method = XRIF_REORDER_NONE;
833 if( rv != XRIF_NOERROR )
834 {
835 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
836 }
837
838 errno = 0;
839 m_xrif_timing_header = reinterpret_cast< char *>(malloc( XRIF_HEADER_SIZE * sizeof( char ) ));
841 {
842 return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "xrif header allocation failed." } );
843 }
844
845 return 0;
846}
847
849{
850 struct sigaction act;
851 sigset_t set;
852
853 act.sa_sigaction = &streamWriter::_handlerSigSegv;
854 act.sa_flags = SA_SIGINFO;
855 sigemptyset( &set );
856 act.sa_mask = set;
857
858 errno = 0;
859 if( sigaction( SIGSEGV, &act, 0 ) < 0 )
860 {
861 std::string logss = "Setting handler for SIGSEGV failed. Errno says: ";
862 logss += strerror( errno );
863
865
866 return -1;
867 }
868
869 errno = 0;
870 if( sigaction( SIGBUS, &act, 0 ) < 0 )
871 {
872 std::string logss = "Setting handler for SIGBUS failed. Errno says: ";
873 logss += strerror( errno );
874
876
877 return -1;
878 }
879
880 log<text_log>( "Installed SIGSEGV/SIGBUS signal handler.", logPrio::LOG_DEBUG );
881
882 return 0;
883}
884
885void streamWriter::_handlerSigSegv( int signum, siginfo_t *siginf, void *ucont )
886{
888}
889
890void streamWriter::handlerSigSegv( int signum, siginfo_t *siginf, void *ucont )
891{
892 static_cast<void>( signum );
893 static_cast<void>( siginf );
894 static_cast<void>( ucont );
895
896 m_restart = true;
897
898 return;
899}
900
901void streamWriter::getCircBuffLengths( size_t &circBuffLength,
902 double &circBuffSize,
903 size_t &writeChunkLength,
904 size_t maxCircBuffLength,
905 double maxCircBuffSize,
906 size_t maxWriteChunkLength,
907 uint32_t width,
908 uint32_t height,
909 size_t typeSize )
910{
911 static constexpr double MB = 1048576.0;
912
913 size_t isz = width * height * typeSize * maxCircBuffLength;
914
915 if( isz <= maxCircBuffSize * MB )
916 {
918 circBuffSize = isz / MB;
920
921 return;
922 }
923
924 circBuffLength = maxCircBuffSize * MB / ( width * height * typeSize );
925
926 if(circBuffLength % 2 == 1)
927 {
929 }
930
931 circBuffSize = ( width * height * typeSize * circBuffLength ) / MB;
932
934
935 if( circBuffLength == 0 )
936 {
937 return;
938 }
939
940 if( writeChunkLength == 0 )
941 {
943 }
944
945 while( circBuffLength % writeChunkLength != 0 )
946 {
948 }
949
950 return;
951}
952
954{
955
962 m_width,
963 m_height,
964 m_typeSize );
965
966 if( m_circBuffLength < 2 )
967 {
968 return log<software_critical, -1>( { __FILE__, __LINE__, "frame size too large to fit in maxCircBuffSize" } );
969 }
970
972 {
973 return log<software_critical, -1>(
974 { __FILE__, __LINE__, "writeChunkLength is not smaller than circBuffLength" } );
975 }
976
978 {
979 return log<software_critical, -1>(
980 { __FILE__, __LINE__, "writeChunkLength is not an integer factor of circBuffLength" } );
981 }
982
983 std::string msg = "Set circ buff length: " + std::to_string( m_circBuffLength ) + " frames (";
984 msg += std::to_string( m_circBuffSize ) + " MB). Write chunk length: " + std::to_string( m_writeChunkLength );
985 msg += " frames.";
986
988
990 {
992 }
993
994 errno = 0;
995 m_rawImageCircBuff = reinterpret_cast< char *>(malloc( m_width * m_height * m_typeSize * m_circBuffLength ));
996
997 if( m_rawImageCircBuff == NULL )
998 {
999 return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "buffer allocation failure" } );
1000 }
1001
1002 if( m_timingCircBuff )
1003 {
1005 }
1006
1007 errno = 0;
1008 m_timingCircBuff = reinterpret_cast<uint64_t *>(malloc( 5 * sizeof( uint64_t ) * m_circBuffLength ));
1009 if( m_timingCircBuff == NULL )
1010 {
1011 return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "buffer allocation failure" } );
1012 }
1013
1014 return 0;
1015}
1016
1018{
1019 // Set up the image data xrif handle
1021
1022 if( m_compress )
1023 {
1025 if( rv != XRIF_NOERROR )
1026 {
1027 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
1028 }
1029 }
1030 else
1031 {
1032 std::cerr << "not compressing . . . \n";
1034 if( rv != XRIF_NOERROR )
1035 {
1036 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
1037 }
1038 }
1039
1041 if( rv != XRIF_NOERROR )
1042 {
1043 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_set_size error." } );
1044 }
1045
1047 if( rv != XRIF_NOERROR )
1048 {
1049 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_raw error." } );
1050 }
1051
1053 if( rv != XRIF_NOERROR )
1054 {
1055 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_reordered error." } );
1056 }
1057
1058 // Set up the timing data xrif handle
1060 if( rv != XRIF_NOERROR )
1061 {
1062 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
1063 }
1064
1066 if( rv != XRIF_NOERROR )
1067 {
1068 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_set_size error." } );
1069 }
1070
1072 if( rv != XRIF_NOERROR )
1073 {
1074 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_raw error." } );
1075 }
1076
1078 if( rv != XRIF_NOERROR )
1079 {
1080 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_reordered error." } );
1081 }
1082
1083 return 0;
1084}
1085
1087{
1088 o->fgThreadExec();
1089}
1090
1092{
1094
1095 // Wait fpr the thread starter to finish initializing this thread.
1096 while( m_fgThreadInit == true && m_shutdown == 0 )
1097 {
1098 sleep( 1 );
1099 }
1100
1101 timespec missing_ts;
1102
1103 IMAGE image;
1104 ino_t inode = 0; // The inode of the image stream file
1105
1106 bool opened = false;
1107
1108 while( m_shutdown == 0 )
1109 {
1110 /* Initialize ImageStreamIO
1111 */
1112 opened = false;
1113 m_restart = false; // Set this up front, since we're about to restart.
1114
1115 sem_t *sem{ nullptr }; ///< The semaphore to monitor for new image data
1116
1117 int logged = 0;
1118 while( !opened && !m_shutdown && !m_restart )
1119 {
1120 // b/c ImageStreamIO prints every single time, and latest version don't support stopping it yet,
1121 // and that isn't thread-safe-able anyway
1122 // we do our own checks. This is the same code in ImageStreamIO_openIm...
1123 int SM_fd;
1124 char SM_fname[200];
1125 ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), m_shmimName.c_str() );
1126 SM_fd = open( SM_fname, O_RDWR );
1127 if( SM_fd == -1 )
1128 {
1129 if( !logged )
1130 {
1131 log<text_log>( "ImageStream " + m_shmimName + " not found (yet). Retrying . . .",
1133 }
1134 logged = 1;
1135 sleep( 1 ); // be patient
1136 continue;
1137 }
1138
1139 // Found and opened, close it and then use ImageStreamIO
1140 logged = 0;
1141 close( SM_fd );
1142
1143 if( ImageStreamIO_openIm( &image, m_shmimName.c_str() ) == 0 )
1144 {
1145 if( image.md[0].sem <=
1146 m_semaphoreNumber ) ///<\todo this isn't right--> isn't there a define in cacao to use?
1147 {
1149 mx::sys::sleep( 1 ); // We just need to wait for the server process to finish startup.
1150 }
1151 else
1152 {
1153 opened = true;
1154
1155 char SM_fname[200];
1156 ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), m_shmimName.c_str() );
1157
1158 struct stat buffer;
1159 int rv = stat( SM_fname, &buffer );
1160
1161 if( rv != 0 )
1162 {
1164 __LINE__,
1165 errno,
1166 "Could not get inode for " + m_shmimName +
1167 ". Source process will need to be restarted." } );
1169 return;
1170 }
1171 inode = buffer.st_ino;
1172 }
1173 }
1174 else
1175 {
1176 mx::sys::sleep( 1 ); // be patient
1177 }
1178 }
1179
1180 if( m_restart )
1181 continue; // this is kinda dumb. we just go around on restart, so why test in the while loop at all?
1182
1183 if( m_shutdown || !opened )
1184 {
1185 if( !opened )
1186 return;
1187
1189 return;
1190 }
1191
1192 // now get a good semaphore
1194 ImageStreamIO_getsemwaitindex( &image, m_semaphoreNumber ); // ask for semaphore we had before
1195
1196 if( m_semaphoreNumber < 0 )
1197 {
1199 { __FILE__,
1200 __LINE__,
1201 "No valid semaphore found for " + m_shmimName + ". Source process will need to be restarted." } );
1202 return;
1203 }
1204
1206 __LINE__,
1207 "got semaphore index " + std::to_string( m_semaphoreNumber ) + " for " + m_shmimName } );
1208
1210
1211 sem = image.semptr[m_semaphoreNumber]; ///< The semaphore to monitor for new image data
1212
1213 m_dataType = image.md[0].datatype;
1215 m_width = image.md[0].size[0];
1216 m_height = image.md[0].size[1];
1217 size_t length;
1218 if( image.md[0].naxis == 3 )
1219 {
1220 length = image.md[0].size[2];
1221 }
1222 else
1223 {
1224 length = 1;
1225 }
1226 std::cerr << "connected"
1227 << " " << m_width << "x" << m_height << "x" << (int)m_dataType << " (" << m_typeSize << ")"
1228 << std::endl;
1229
1230 // Now allocate the circBuffs
1231 if( allocate_circbufs() < 0 )
1232 {
1233 return; // will cause shutdown!
1234 }
1235
1236 // And allocate the xrifs
1237 if( allocate_xrif() < 0 )
1238 {
1239 return; // Will cause shutdown!
1240 }
1241
1242 uint8_t atype;
1243 size_t snx, sny, snz;
1244
1245 uint64_t curr_image; // The current cnt1 index
1246 m_currImage = 0;
1247 m_currChunkStart = 0;
1248 m_nextChunkStart = 0;
1249
1250 // Initialized curr_image ...
1251 if( image.md[0].naxis > 2 )
1252 {
1253 curr_image = image.md[0].cnt1;
1254 }
1255 else
1256 {
1257 curr_image = 0;
1258 }
1259
1260 uint64_t last_cnt0; // = ((uint64_t)-1);
1261
1262 // so we can initialize last_cnt0 to avoid frame skip on startup
1263 if( image.cntarray )
1264 {
1265 last_cnt0 = image.cntarray[curr_image];
1266 }
1267 else
1268 {
1269 last_cnt0 = image.md[0].cnt0;
1270 }
1271
1272 int cnt0flag = 0;
1273
1274 bool restartWriting = false; // flag to prevent logging on a logging restart
1275
1276 // This is the main image grabbing loop.
1277 while( !m_shutdown && !m_restart )
1278 {
1279 timespec ts;
1281
1282 if( sem_timedwait( sem, &ts ) == 0 )
1283 {
1284 if( image.md[0].naxis > 2 )
1285 {
1286 curr_image = image.md[0].cnt1;
1287 }
1288 else
1289 {
1290 curr_image = 0;
1291 }
1292
1293 atype = image.md[0].datatype;
1294 snx = image.md[0].size[0];
1295 sny = image.md[0].size[1];
1296 if( image.md[0].naxis == 3 )
1297 {
1298 snz = image.md[0].size[2];
1299 }
1300 else
1301 {
1302 snz = 1;
1303 }
1304
1305 if( atype != m_dataType || snx != m_width || sny != m_height || snz != length )
1306 {
1307 break; // exit the nearest while loop and get the new image setup.
1308 }
1309
1310 if( m_shutdown || m_restart )
1311 {
1312 break; // Check for exit signals
1313 }
1314
1316 if( image.cntarray )
1317 {
1318 new_cnt0 = image.cntarray[curr_image];
1319 }
1320 else
1321 {
1322 new_cnt0 = image.md[0].cnt0;
1323 }
1324
1325#ifdef SW_DEBUG
1326 std::cerr << "new_cnt0: " << new_cnt0 << "\n";
1327#endif
1328
1329 ///\todo cleanup skip frame handling.
1330 if( new_cnt0 == last_cnt0 ) //<- this probably isn't useful really
1331 {
1332 log<text_log>( "semaphore raised but cnt0 has not changed -- we're probably getting behind",
1334 ++cnt0flag;
1335 if( cnt0flag > 10 )
1336 {
1337 m_restart = true; // if we get here 10 times then something else is wrong.
1338 }
1339 continue;
1340 }
1341
1342 if( new_cnt0 - last_cnt0 > 1 ) //<- this is what we want to check.
1343 {
1344 log<text_log>( "cnt0 changed by more than 1. Frame skipped.", logPrio::LOG_WARNING );
1345 }
1346
1347 cnt0flag = 0;
1348
1350
1352 char *curr_src = reinterpret_cast< char *>(image.array.raw) + curr_image * m_width * m_height * m_typeSize;
1353
1355
1357
1358 if( image.cntarray )
1359 {
1360 curr_timing[0] = image.cntarray[curr_image];
1361 curr_timing[1] = image.atimearray[curr_image].tv_sec;
1362 curr_timing[2] = image.atimearray[curr_image].tv_nsec;
1363 curr_timing[3] = image.writetimearray[curr_image].tv_sec;
1364 curr_timing[4] = image.writetimearray[curr_image].tv_nsec;
1365 }
1366 else
1367 {
1368 curr_timing[0] = image.md[0].cnt0;
1369 curr_timing[1] = image.md[0].atime.tv_sec;
1370 curr_timing[2] = image.md[0].atime.tv_nsec;
1371 curr_timing[3] = image.md[0].writetime.tv_sec;
1372 curr_timing[4] = image.md[0].writetime.tv_nsec;
1373 }
1374
1375 // Check if we need to time-stamp ourselves -- for old cacao streams
1376 if( curr_timing[1] == 0 )
1377 {
1378
1380 {
1381 log<software_critical>( { __FILE__, __LINE__, errno, 0, "clock_gettime" } );
1382 return;
1383 }
1384
1385 curr_timing[1] = missing_ts.tv_sec;
1386 curr_timing[2] = missing_ts.tv_nsec;
1387 }
1388
1389 // just set w-time to a-time if it's missing
1390 if( curr_timing[3] == 0 )
1391 {
1392 curr_timing[3] = curr_timing[1];
1393 curr_timing[4] = curr_timing[2];
1394 }
1395
1396 m_currImageTime = 1.0 * curr_timing[3] + ( 1.0 * curr_timing[4] ) / 1e9;
1397
1398 if( m_shutdown && m_writing == WRITING )
1399 {
1401 }
1402
1403 switch( m_writing )
1404 {
1405 case START_WRITING:
1406
1410
1411 if( !restartWriting ) // We only log if this is really a start
1412 {
1413 log<saving_start>( { 1, new_cnt0 } );
1414 }
1415 else // on a restart after a timeout we don't log
1416 {
1417 restartWriting = false;
1418 }
1419
1421
1422 // fall through
1423 case WRITING:
1425 {
1429
1430#ifdef SW_DEBUG
1431 std::cerr << __FILE__ << " " << __LINE__ << " WRITING " << m_currImage << " "
1432 << m_nextChunkStart << " "
1433 << ( m_currImage - m_nextChunkStart == m_writeChunkLength - 1 ) << " "
1435 << "\n";
1436#endif
1437
1438 // Now tell the writer to get going
1439 if( sem_post( &m_swSemaphore ) < 0 )
1440 {
1441 log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1442 return;
1443 }
1444
1447 {
1448 m_nextChunkStart = 0;
1449 }
1450
1453 }
1455 {
1459
1460#ifdef SW_DEBUG
1461 std::cerr << __FILE__ << " " << __LINE__ << " IMAGE TIME WRITING " << m_currImage << " "
1462 << m_nextChunkStart << " "
1463 << ( m_currImage - m_nextChunkStart == m_writeChunkLength - 1 ) << " "
1465 << "\n";
1466#endif
1467
1468 // Now tell the writer to get going
1469 if( sem_post( &m_swSemaphore ) < 0 )
1470 {
1471 log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1472 return;
1473 }
1474
1476 restartWriting = true;
1477 }
1478 break;
1479
1480 case STOP_WRITING:
1484
1485#ifdef SW_DEBUG
1486 std::cerr << __FILE__ << " " << __LINE__ << " STOP_WRITING\n";
1487#endif
1488
1489 // Now tell the writer to get going
1490 if( sem_post( &m_swSemaphore ) < 0 )
1491 {
1492 log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1493 return;
1494 }
1495 restartWriting = false;
1496 break;
1497
1498 default:
1499 break;
1500 }
1501
1502 ++m_currImage;
1504 {
1505 m_currImage = 0;
1506 }
1507 }
1508 else
1509 {
1510 // If semaphore times-out or errors, we first cleanup any writing that needs to be done
1511 // we can also get here if a signal interrupts the sem wait which is triggered by INDI callbacks
1512 switch( m_writing )
1513 {
1514 case WRITING:
1515 // Here, if there is at least 1 image, we check for delta-time > m_maxChunkTime
1516 // then write
1517 if( ( m_currImage - m_nextChunkStart > 0 ) &&
1518 ( mx::sys::get_curr_time() - m_currChunkStartTime > m_maxChunkTime ) )
1519 {
1523
1524#ifdef SW_DEBUG
1525 std::cerr << __FILE__ << " " << __LINE__ << " TIMEOUT WRITING " << " " << m_currImage << " "
1526 << m_nextChunkStart << " " << ( m_currImage - m_nextChunkStart ) << " " << last_cnt0
1527 << "\n";
1528#endif
1529
1530 // Now tell the writer to get going
1531 if( sem_post( &m_swSemaphore ) < 0 )
1532 {
1533 log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1534 return;
1535 }
1536
1538 restartWriting = true;
1539 }
1540 break;
1541 case STOP_WRITING:
1542 // If we timed-out while STOP_WRITING is set, we trigger a write.
1546
1547#ifdef SW_DEBUG
1548 std::cerr << __FILE__ << " " << __LINE__ << " TIMEOUT STOP_WRITING\n";
1549#endif
1550
1551 // Now tell the writer to get going
1552 if( sem_post( &m_swSemaphore ) < 0 )
1553 {
1554 log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1555 return;
1556 }
1557 restartWriting = false;
1558 break;
1559 default:
1560 break;
1561 }
1562
1563 if( image.md[0].sem <= 0 )
1564 {
1565 break; // Indicates that the server has cleaned up.
1566 }
1567
1568 // Check for why we timed out
1569 if( errno == EINTR )
1570 {
1571 break; // This will indicate time to shutdown, loop will exit normally flags set.
1572 }
1573
1574 // ETIMEDOUT just means we should wait more.
1575 // Otherwise, report an error.
1576 if( errno != ETIMEDOUT )
1577 {
1578 log<software_error>( { __FILE__, __LINE__, errno, "sem_timedwait" } );
1579 break;
1580 }
1581
1582 // Check if the file has disappeared.
1583 int SM_fd;
1584 char SM_fname[200];
1585 ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), m_shmimName.c_str() );
1586 SM_fd = open( SM_fname, O_RDWR );
1587 if( SM_fd == -1 )
1588 {
1589 m_restart = true;
1590 }
1591 close( SM_fd );
1592
1593 // Check if the inode changed
1594 struct stat buffer;
1595 int rv = stat( SM_fname, &buffer );
1596 if( rv != 0 )
1597 {
1598 m_restart = true;
1599 }
1600
1601 if( buffer.st_ino != inode )
1602 {
1603#ifdef SW_DEBUG
1604 std::cerr << "Restarting due to inode . . . \n";
1605#endif
1606 m_restart = true;
1607 }
1608 }
1609 }
1610
1611 ///\todo might still be writing here, so must check
1612 // If semaphore times-out or errors, we first cleanup any writing that needs to be done
1614 {
1615 // Here, if there is at least 1 image, then write
1616 if( ( m_currImage - m_nextChunkStart > 0 ) )
1617 {
1621
1623
1624 std::cerr << __FILE__ << " " << __LINE__ << " WRITING ON RESTART " << last_cnt0 << "\n";
1625 // Now tell the writer to get going
1626 if( sem_post( &m_swSemaphore ) < 0 )
1627 {
1628 log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1629 return;
1630 }
1631 }
1632 else
1633 {
1635 }
1636
1637 while( m_writing != NOT_WRITING )
1638 {
1639 std::cerr << __FILE__ << " " << __LINE__ << " WAITING TO FINISH WRITING " << last_cnt0 << "\n";
1640 sleep( 1 );
1641 }
1642 }
1643
1644 if( m_rawImageCircBuff )
1645 {
1648 }
1649
1650 if( m_timingCircBuff )
1651 {
1653 m_timingCircBuff = 0;
1654 }
1655
1656 if( opened )
1657 {
1658 if( m_semaphoreNumber >= 0 )
1659 {
1660 ///\todo is this release necessary with closeIM?
1661 image.semReadPID[m_semaphoreNumber] = 0; // release semaphore
1662 }
1664 opened = false;
1665 }
1666
1667 } // outer loop, will exit if m_shutdown==true
1668
1669 // One more check
1670 if( m_rawImageCircBuff )
1671 {
1674 }
1675
1676 if( m_timingCircBuff )
1677 {
1679 m_timingCircBuff = 0;
1680 }
1681
1682 if( opened )
1683 {
1684 if( m_semaphoreNumber >= 0 )
1685 {
1686 ///\todo is this release necessary with closeIM?
1687 image.semReadPID[m_semaphoreNumber] = 0; // release semaphore.
1688 }
1689
1691 }
1692}
1693
1695{
1696 s->swThreadExec();
1697}
1698
1700{
1702
1703 // Wait fpr the thread starter to finish initializing this thread.
1704 while( m_swThreadInit == true && m_shutdown == 0 )
1705 {
1706 sleep( 1 );
1707 }
1708
1709 while( !m_shutdown )
1710 {
1711 while( !shutdown() && ( !( state() == stateCodes::READY || state() == stateCodes::OPERATING ) ) )
1712 {
1713 if( m_fname )
1714 {
1715 free( m_fname );
1716 m_fname = nullptr;
1717 }
1718 sleep( 1 );
1719 }
1720
1721 if( shutdown() )
1722 {
1723 break;
1724 }
1725
1726 // This will happen after a reconnection, and could update m_shmimName, etc.
1727 if( m_fname == nullptr )
1728 {
1729 m_fnameBase = m_rawimageDir + "/" + m_outName + "_";
1730
1731 m_fnameSz = m_fnameBase.size() + sizeof( "YYYYMMDDHHMMSSNNNNNNNNN.xrif" ); // the sizeof includes the \0
1732 m_fname = reinterpret_cast< char *>(malloc( m_fnameSz ));
1733
1734 snprintf( m_fname, m_fnameSz, "%sYYYYMMDDHHMMSSNNNNNNNNN.xrif", m_fnameBase.c_str() );
1735 }
1736
1737 // at this point fname is not null.
1738
1739 timespec ts;
1740
1741 if( clock_gettime( CLOCK_REALTIME, &ts ) < 0 )
1742 {
1743 log<software_critical>( { __FILE__, __LINE__, errno, 0, "clock_gettime" } );
1744
1745 free( m_fname );
1746 m_fname = nullptr;
1747
1748 return; // will trigger a shutdown
1749 }
1750
1751 mx::sys::timespecAddNsec( ts, m_semWaitNSec );
1752
1753 if( sem_timedwait( &m_swSemaphore, &ts ) == 0 )
1754 {
1755 if( doEncode() < 0 )
1756 {
1757 log<software_critical>( { __FILE__, __LINE__, "error encoding data" } );
1758 return;
1759 }
1760 // Otherwise, success, and we just go on.
1761 }
1762 else
1763 {
1764 // Check for why we timed out
1765 if( errno == EINTR )
1766 {
1767 continue; // This will probably indicate time to shutdown, loop will exit normally if flags set.
1768 }
1769
1770 // ETIMEDOUT just means we should wait more.
1771 // Otherwise, report an error.
1772 if( errno != ETIMEDOUT )
1773 {
1774 log<software_error>( { __FILE__, __LINE__, errno, "sem_timedwait" } );
1775 break;
1776 }
1777 }
1778 } // outer loop, will exit if m_shutdown==true
1779
1780 if( m_fname )
1781 {
1782 free( m_fname );
1783 m_fname = nullptr;
1784 }
1785}
1786
1788{
1789 if( m_writing == NOT_WRITING )
1790 {
1791 return 0;
1792 }
1793
1794 recordSavingState( true );
1795
1796 // Record these to prevent a change in other thread
1799 size_t nFrames = m_currSaveStop - saveStart;
1800 size_t nBytes = m_width * m_height * m_typeSize;
1801
1802#ifdef SW_DEBUG
1803 std::cerr << "nFrames: " << nFrames << "\n";
1804#endif
1805
1806 if(nFrames == 0) //can happend during a stop. just clean up but don't try to write nothting.
1807 {
1808 #ifdef SW_DEBUG
1809 std::cerr << "nothing to write\n";
1810 #endif
1811
1812 recordSavingStats( true );
1813
1814 if( m_writing == STOP_WRITING )
1815 {
1818 }
1819
1820 recordSavingState( true );
1821
1822 return 0;
1823 }
1824 // Configure xrif and copy image data -- this does no allocations
1826 if( rv != XRIF_NOERROR )
1827 {
1828 // This is a big problem. Report it as "ALERT" and go on.
1829 log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif set size error. DATA POSSIBLY LOST" } );
1830 }
1831
1833 if( rv != XRIF_NOERROR )
1834 {
1835 // This may just be out of range, it's only an error.
1836 log<software_error>( { __FILE__, __LINE__, 0, rv, "xrif set LZ4 acceleration error." } );
1837 }
1838
1840
1841 // Configure xrif and copy timing data -- no allocations
1843 if( rv != XRIF_NOERROR )
1844 {
1845 // This is a big problem. Report it as "ALERT" and go on.
1846 log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif set size error. DATA POSSIBLY LOST." } );
1847 }
1848
1850 if( rv != XRIF_NOERROR )
1851 {
1852 // This may just be out of range, it's only an error.
1853 log<software_error>( { __FILE__, __LINE__, 0, rv, "xrif set LZ4 acceleration error." } );
1854 }
1855
1856#ifdef SW_DEBUG
1857 for( size_t nF = 0; nF < nFrames; ++nF )
1858 {
1859 std::cerr << " " << ( m_timingCircBuff + saveStart * 5 + nF * 5 )[0] << "\n";
1860 }
1861#endif
1862
1863 memcpy( m_xrif_timing->raw_buffer, m_timingCircBuff + saveStart * 5, nFrames * 5 * sizeof( uint64_t ) );
1864
1865 rv = xrif_encode( m_xrif );
1866 if( rv != XRIF_NOERROR )
1867 {
1868 // This is a big problem. Report it as "ALERT" and go on.
1869 log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif encode error. DATA POSSIBLY LOST." } );
1870 }
1871
1873 if( rv != XRIF_NOERROR )
1874 {
1875 // This is a big problem. Report it as "ALERT" and go on.
1876 log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif write header error. DATA POSSIBLY LOST." } );
1877 }
1878
1880 if( rv != XRIF_NOERROR )
1881 {
1882 // This is a big problem. Report it as "ALERT" and go on.
1883 log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif encode error. DATA POSSIBLY LOST." } );
1884 }
1885
1887 if( rv != XRIF_NOERROR )
1888 {
1889 // This is a big problem. Report it as "ALERT" and go on.
1890 log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif write header error. DATA POSSIBLY LOST" } );
1891 }
1892
1893 // Now break down the acq time of the first image in the buffer for use in file name
1894 tm uttime; // The broken down time.
1895 timespec *fts = reinterpret_cast< timespec *>( m_timingCircBuff + saveStart * 5 + 1 );
1896
1897 if( gmtime_r( &fts->tv_sec, &uttime ) == 0 )
1898 {
1899 // Yell at operator but keep going
1901 { __FILE__, __LINE__, errno, 0, "gmtime_r error. possible loss of timing information." } );
1902 }
1903
1904 // Available size = m_fnameSz-m_fnameBase.size(), rather than assuming sizeof("YYYYMMDDHHMMSSNNNNNNNNN"), in case we
1905 // screwed up somewhere.
1906 rv = snprintf( m_fname + m_fnameBase.size(),
1907 m_fnameSz - m_fnameBase.size(),
1908 "%04i%02i%02i%02i%02i%02i%09i",
1909 uttime.tm_year + 1900,
1910 uttime.tm_mon + 1,
1911 uttime.tm_mday,
1912 uttime.tm_hour,
1913 uttime.tm_min,
1914 uttime.tm_sec,
1915 static_cast<int>( fts->tv_nsec ) );
1916
1917 if( rv != sizeof( "YYYYMMDDHHMMSSNNNNNNNNN" ) - 1 )
1918 {
1919 // Something is very wrong. Keep going to try to get it on disk.
1920 log<software_alert>( { __FILE__, __LINE__, errno, rv, "did not write enough chars to timestamp" } );
1921 }
1922
1923 // Cover up the \0 inserted by snprintf
1924 ( m_fname + m_fnameBase.size() )[23] = '.';
1925
1926 FILE *fp_xrif = fopen( m_fname, "wb" );
1927 if( fp_xrif == NULL )
1928 {
1929 // This is it. If we can't write data to disk need to fix.
1930 log<software_alert>( { __FILE__, __LINE__, errno, 0, "failed to open file for writing" } );
1931
1932 free( m_fname );
1933 m_fname = nullptr;
1934
1935 return -1; // will trigger a shutdown
1936 }
1937
1938 size_t bw = fwrite( m_xrif_header, sizeof( uint8_t ), XRIF_HEADER_SIZE, fp_xrif );
1939
1940 if( bw != XRIF_HEADER_SIZE )
1941 {
1943 __LINE__,
1944 errno,
1945 0,
1946 "failure writing header to file. DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
1947 // We go on . . .
1948 }
1949
1950 bw = fwrite( m_xrif->raw_buffer, sizeof( uint8_t ), m_xrif->compressed_size, fp_xrif );
1951
1952 if( bw != m_xrif->compressed_size )
1953 {
1955 __LINE__,
1956 errno,
1957 0,
1958 "failure writing data to file. DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
1959 }
1960
1962
1963 if( bw != XRIF_HEADER_SIZE )
1964 {
1966 { __FILE__,
1967 __LINE__,
1968 errno,
1969 0,
1970 "failure writing timing header to file. DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
1971 }
1972
1973 bw = fwrite( m_xrif_timing->raw_buffer, sizeof( uint8_t ), m_xrif_timing->compressed_size, fp_xrif );
1974
1975 if( bw != m_xrif_timing->compressed_size )
1976 {
1978 { __FILE__,
1979 __LINE__,
1980 errno,
1981 0,
1982 "failure writing timing data to file. DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
1983 }
1984
1985 fclose( fp_xrif );
1986
1987 recordSavingStats( true );
1988
1989 if( m_writing == STOP_WRITING )
1990 {
1993 }
1994
1995 recordSavingState( true );
1996
1997 return 0;
1998
1999} // doEncode
2000
2001INDI_NEWCALLBACK_DEFN( streamWriter, m_indiP_writing )
2002( const pcf::IndiProperty &ipRecv )
2003{
2004 INDI_VALIDATE_CALLBACK_PROPS( m_indiP_writing, ipRecv );
2005
2006 if( !ipRecv.find( "toggle" ) )
2007 {
2008 return 0;
2009 }
2010
2011 if( ipRecv["toggle"].getSwitchState() == pcf::IndiElement::Off &&
2012 ( m_writing == WRITING || m_writing == START_WRITING ) )
2013 {
2014 m_writing = STOP_WRITING;
2015 }
2016
2017 if( ipRecv["toggle"].getSwitchState() == pcf::IndiElement::On && m_writing == NOT_WRITING )
2018 {
2019 m_writing = START_WRITING;
2020 }
2021
2022 return 0;
2023}
2024
2026{
2027 // Only update this if not changing
2028 if( m_writing == NOT_WRITING || m_writing == WRITING )
2029 {
2030 if( m_xrif && m_writing == WRITING )
2031 {
2032 indi::updateSwitchIfChanged( m_indiP_writing, "toggle", pcf::IndiElement::On, m_indiDriver, INDI_OK );
2033 indi::updateIfChanged( m_indiP_xrifStats, "ratio", m_xrif->compression_ratio, m_indiDriver, INDI_BUSY );
2035 m_indiP_xrifStats, "encodeMBsec", m_xrif->encode_rate / 1048576.0, m_indiDriver, INDI_BUSY );
2037 "encodeFPS",
2038 m_xrif->encode_rate / ( m_width * m_height * m_typeSize ),
2040 INDI_BUSY );
2042 m_indiP_xrifStats, "differenceMBsec", m_xrif->difference_rate / 1048576.0, m_indiDriver, INDI_BUSY );
2044 "differenceFPS",
2045 m_xrif->difference_rate / ( m_width * m_height * m_typeSize ),
2047 INDI_BUSY );
2049 m_indiP_xrifStats, "reorderMBsec", m_xrif->reorder_rate / 1048576.0, m_indiDriver, INDI_BUSY );
2051 "reorderFPS",
2052 m_xrif->reorder_rate / ( m_width * m_height * m_typeSize ),
2054 INDI_BUSY );
2056 m_indiP_xrifStats, "compressMBsec", m_xrif->compress_rate / 1048576.0, m_indiDriver, INDI_BUSY );
2058 "compressFPS",
2059 m_xrif->compress_rate / ( m_width * m_height * m_typeSize ),
2061 INDI_BUSY );
2062 }
2063 else
2064 {
2065 indi::updateSwitchIfChanged( m_indiP_writing, "toggle", pcf::IndiElement::Off, m_indiDriver, INDI_OK );
2075 }
2076 }
2077}
2078
2083
2085{
2086 return recordSavingState( true );
2087}
2088
2090{
2091 static int16_t lastState = -1;
2092 static uint64_t currSaveStart = -1;
2093
2094 int16_t state;
2096 m_writing == STOP_WRITING ) // Changed from just writing 5/2024
2097 state = 1;
2098 else
2099 state = 0;
2100
2102 {
2104
2105 lastState = state;
2107 }
2108
2109 return 0;
2110}
2111
2113{
2114 static uint32_t last_rawSize = -1;
2115 static uint32_t last_compressedSize = -1;
2116 static float last_encodeRate = -1;
2117 static float last_differenceRate = -1;
2118 static float last_reorderRate = -1;
2119 static float last_compressRate = -1;
2120
2121 if( m_xrif->raw_size != last_rawSize || m_xrif->compressed_size != last_compressedSize ||
2122 m_xrif->encode_rate != last_encodeRate || m_xrif->difference_rate != last_differenceRate ||
2123 m_xrif->reorder_rate != last_reorderRate || m_xrif->compress_rate != last_compressRate || force )
2124 {
2125 telem<telem_saving>( { (uint32_t)m_xrif->raw_size,
2126 (uint32_t)m_xrif->compressed_size,
2127 (float)m_xrif->encode_rate,
2128 (float)m_xrif->difference_rate,
2129 (float)m_xrif->reorder_rate,
2130 (float)m_xrif->compress_rate } );
2131
2132 last_rawSize = m_xrif->raw_size;
2133 last_compressedSize = m_xrif->compressed_size;
2134 last_encodeRate = m_xrif->encode_rate;
2135 last_differenceRate = m_xrif->difference_rate;
2136 last_reorderRate = m_xrif->reorder_rate;
2137 last_compressRate = m_xrif->compress_rate;
2138 }
2139
2140 return 0;
2141}
2142
2143} // namespace app
2144} // namespace MagAOX
2145
2146#endif
The base-class for XWCTk applications.
std::string basePath()
Get the.
stateCodes::stateCodeT state()
Get the current state code.
int registerIndiPropertyNew(pcf::IndiProperty &prop, int(*)(void *, const pcf::IndiProperty &))
Register an INDI property which is exposed for others to request a New Property for.
int createStandardIndiToggleSw(pcf::IndiProperty &prop, const std::string &name, const std::string &label="", const std::string &group="")
Create a standard R/W INDI switch with a single toggle element.
int m_shutdown
Flag to signal it's time to shutdown. When not 0, the main loop exits.
int shutdown()
Get the value of the shutdown flag.
indiDriver< MagAOXApp > * m_indiDriver
The INDI driver wrapper. Constructed and initialized by execute, which starts and stops communication...
static int log(const typename logT::messageT &msg, logPrioT level=logPrio::LOG_DEFAULT)
Make a log entry.
int threadStart(std::thread &thrd, bool &thrdInit, pid_t &tpid, pcf::IndiProperty &thProp, int thrdPrio, const std::string &cpuset, const std::string &thrdName, thisPtr *thrdThis, Function &&thrdStart)
Start a thread, using this class's privileges to set priority, etc.
pcf::IndiProperty m_indiP_xrifStats
pcf::IndiProperty m_indiP_writing
double m_currImageTime
The write-time of the current image.
std::thread m_fgThread
A separate thread for the actual framegrabbings.
int m_fgThreadPrio
Priority of the framegrabber thread, should normally be > 00.
pid_t m_fgThreadID
F.g. thread PID.
size_t m_circBuffLength
The length of the circular buffer, in frames.
int initialize_xrif()
Initialize the xrif system.
int recordTelem(const telem_saving_state *)
uint8_t m_dataType
The ImageStreamIO type code.
unsigned m_semWaitSec
The time in whole sec to wait on the semaphore, to which m_semWaitNSec is added. Default is 0 nsec.
bool m_fgThreadInit
Synchronizer to ensure f.g. thread initializes before doing dangerous things.
int m_typeSize
The pixel byte depth.
xrif_t m_xrif_timing
The xrif compression handle for image data.
uint64_t m_currSaveStart
The circular buffer position at which to start saving.
virtual int appStartup()
Startup functions.
size_t m_writeChunkLength
The number of frames to write at a time.
size_t m_height
The height of the image.
uint64_t m_currSaveStopFrameNo
The frame number of the image at which saving stopped (for logging)
double m_currChunkStartTime
The write-time of the first image in the chunk.
int allocate_circbufs()
Worker function to allocate the circular buffers.
static void getCircBuffLengths(size_t &circBuffLength, double &circBuffSize, size_t &writeChunkLength, size_t maxCircBuffLength, double maxCircBuffSize, size_t maxWriteChunkLength, uint32_t width, uint32_t height, size_t typeSize)
void handlerSigSegv(int signum, siginfo_t *siginf, void *ucont)
Handles SIGSEGV and SIGBUS. Sets m_restart to true.
int m_swThreadPrio
Priority of the stream writer thread, should normally be > 0, and <= m_fgThreadPrio.
int recordSavingStats(bool force=false)
pcf::IndiProperty m_swThreadProp
The property to hold the s.w. thread details.
std::thread m_swThread
A separate thread for the actual writing.
std::string m_fgCpuset
The cpuset for the framegrabber thread. Ignored if empty (the default).
virtual int appLogic()
Implementation of the FSM for the Siglent SDG.
static void fgThreadStart(streamWriter *s)
Thread starter, called by fgThreadStart on thread construction. Calls fgThreadExec.
std::string m_outName
The name to use for outputting files, Default is m_shmimName.
size_t m_maxCircBuffLength
The maximum length of the circular buffer, in frames.
INDI_NEWCALLBACK_DECL(streamWriter, m_indiP_writing)
char * m_xrif_timing_header
Storage for the xrif image data file header.
~streamWriter() noexcept
Destructor.
int allocate_xrif()
Worker function to configure and allocate the xrif handles.
int setSigSegvHandler()
Sets the handler for SIGSEGV and SIGBUS.
std::string m_swCpuset
The cpuset for the framegrabber thread. Ignored if empty (the default).
xrif_t m_xrif
The xrif compression handle for image data.
virtual void setupConfig()
Setup the configuration system (called by MagAOXApp::setup())
int m_semaphoreNumber
The image structure semaphore index.
std::string m_rawimageDir
The path where files will be saved.
static void _handlerSigSegv(int signum, siginfo_t *siginf, void *ucont)
int recordSavingState(bool force=false)
int m_writing
Controls whether or not images are being written, and sequences start and stop of writing.
uint64_t m_currChunkStart
The circular buffer starting position of the current to-be-written chunk.
pid_t m_swThreadID
S.w. thread pid.
virtual void loadConfig()
load the configuration system results (called by MagAOXApp::setup())
pcf::IndiProperty m_fgThreadProp
The property to hold the f.g. thread details.
void swThreadExec()
Execute the stream writer main loop.
uint64_t m_nextChunkStart
The circular buffer starting position of the next to-be-written chunk.
static streamWriter * m_selfWriter
char * m_xrif_header
Storage for the xrif image data file header.
double m_maxChunkTime
The maximum time before writing regardless of number of frames.
double m_maxCircBuffSize
The maximum size of the circular bufffer in MB.
int doEncode()
Function called when semaphore is raised to do the encode and write.
static void swThreadStart(streamWriter *s)
Thread starter, called by swThreadStart on thread construction. Calls swThreadExec.
uint64_t m_currSaveStop
The circular buffer position at which to stop saving.
void fgThreadExec()
Execute the frame grabber main loop.
std::string m_shmimName
The name of the shared memory buffer.
dev::telemeter< streamWriter > telemeterT
size_t m_width
The width of the image.
double m_circBuffSize
The size of the circular buffer, in MB.
virtual int appShutdown()
Do any needed shutdown tasks. Currently nothing in this app.
sem_t m_swSemaphore
Semaphore used to synchronize the fg thread and the sw thread.
bool m_swThreadInit
Synchronizer to ensure s.w. thread initializes before doing dangerous things.
#define MAGAOX_rawimageRelPath
The relative path to the raw images directory.
Definition paths.hpp:92
#define MAGAOX_env_rawimage
Environment variable setting the relative raw image path.
#define INDI_NEWCALLBACK_DEFN(class, prop)
Define the callback for a new property request.
#define REG_INDI_NEWPROP_NOCB(prop, propName, type)
Register a NEW INDI property with the class, with no callback.
#define INDI_NEWCALLBACK(prop)
Get the name of the static callback wrapper for a new property.
@ OPERATING
The device is operating, other than homing.
@ READY
The device is ready for operation, but is not operating.
#define INDI_VALIDATE_CALLBACK_PROPS(prop1, prop2)
Standard check for matching INDI properties in a callback.
#define INDI_IDLE
Definition indiUtils.hpp:27
#define INDI_BUSY
Definition indiUtils.hpp:29
#define INDI_OK
Definition indiUtils.hpp:28
void updateIfChanged(pcf::IndiProperty &p, const std::string &el, const T &newVal, indiDriverT *indiDriver, pcf::IndiProperty::PropertyStateType newState=pcf::IndiProperty::Ok)
Update the value of the INDI element, but only if it has changed.
Definition indiUtils.hpp:92
void updateSwitchIfChanged(pcf::IndiProperty &p, const std::string &el, const pcf::IndiElement::SwitchStateType &newVal, indiDriverT *indiDriver, pcf::IndiProperty::PropertyStateType newState=pcf::IndiProperty::Ok)
Update the value of the INDI element, but only if it has changed.
std::stringstream msg
const pcf::IndiProperty & ipRecv
Definition dm.hpp:26
static constexpr logPrioT LOG_NOTICE
A normal but significant condition.
static constexpr logPrioT LOG_CRITICAL
The process can not continue and will shut down (fatal)
static constexpr logPrioT LOG_WARNING
A condition has occurred which may become an error, but the process continues.
static constexpr logPrioT LOG_DEBUG
Used for debugging.
#define XWC_SEM_WAIT_TS_RETVOID(ts, sec, nsec)
Add the wait time to a timespec for a sem_timedwait call, with no value returned on error.
Definition semUtils.hpp:20
#define NOT_WRITING
#define STOP_WRITING
#define WRITING
#define START_WRITING
A device base class which saves telemetry.
Definition telemeter.hpp:69
int appShutdown()
Perform telemeter application shutdown.
int loadConfig(appConfigurator &config)
Load the device section from an application configurator.
int appLogic()
Perform telemeter application logic.
int setupConfig(appConfigurator &config)
Setup an application configurator for the device section.
int appStartup()
Starts the telemetry log thread.
int checkRecordTimes(const telT &tel, telTs... tels)
Check the time of the last record for each telemetry type and make an entry if needed.
Software CRITICAL log entry.
Software ERR log entry.