API
 
Loading...
Searching...
No Matches
streamWriter.hpp
Go to the documentation of this file.
1/** \file streamWriter.hpp
2 * \brief The MagAO-X Image Stream Writer
3 *
4 * \author Jared R. Males (jaredmales@gmail.com)
5 *
6 * \ingroup streamWriter_files
7 */
8
9#ifndef streamWriter_hpp
10#define streamWriter_hpp
11
12#include <filesystem>
13
14#include <ImageStreamIO/ImageStruct.h>
15#include <ImageStreamIO/ImageStreamIO.h>
16
17#include <xrif/xrif.h>
18
19#include <mx/sys/timeUtils.hpp>
20
21#include "../../libMagAOX/libMagAOX.hpp" //Note this is included on command line to trigger pch
22#include "../../magaox_git_version.h"
23
24#define NOT_WRITING ( 0 )
25#define START_WRITING ( 1 )
26#define WRITING ( 2 )
27#define STOP_WRITING ( 3 )
28
29// #define SW_DEBUG
30
31namespace MagAOX
32{
33namespace app
34{
35
36/** \defgroup streamWriter ImageStreamIO Stream Writing
37 * \brief Writes the contents of an ImageStreamIO image stream to disk.
38 *
39 * <a href="../handbook/operating/software/apps/streamWriter.html">Application Documentation</a>
40 *
41 * \ingroup apps
42 *
43 */
44
45/** \defgroup streamWriter_files ImageStreamIO Stream Writing
46 * \ingroup streamWriter
47 */
48
49/** MagAO-X application to control writing ImageStreamIO streams to disk.
50 *
51 * \ingroup streamWriter
52 *
53 */
54class streamWriter : public MagAOXApp<>, public dev::telemeter<streamWriter>
55{
57
58 friend class dev::telemeter<streamWriter>;
59
60 // Give the test harness access.
61 friend class streamWriter_test;
63
64 protected:
65 /** \name configurable parameters
66 *@{
67 */
68
69 std::string m_rawimageDir; ///< The path where files will be saved.
70
71 size_t m_maxCircBuffLength{ 1024 }; ///< The maximum length of the circular buffer, in frames
72
73 double m_maxCircBuffSize{ 2048 }; ///< The maximum size of the circular bufffer in MB.
74
75 size_t m_maxWriteChunkLength{ 512 }; /**< The maximum number of frames to write at a time. Must
76 be an integer factor of m_maxCircBuffLength.*/
77
78 double m_maxChunkTime{ 10 }; ///< The maximum time before writing regardless of number of frames.
79
80 std::string m_shmimName; ///< The name of the shared memory buffer.
81
82 std::string m_outName; ///< The name to use for outputting files, Default is m_shmimName.
83
84 int m_semaphoreNumber{ 7 }; ///< The image structure semaphore index.
85
86 unsigned m_semWaitSec{ 0 }; /**< The time in whole sec to wait on the semaphore,
87 to which m_semWaitNSec is added. Default is 0 nsec.*/
88
89 unsigned m_semWaitNSec{ 500000000 }; /**< The time in nsec to wait on the semaphore, added to m_semWaitSec.
90 Max is 999999999. Default is 5e8 nsec. */
91
92 int m_lz4accel{ 1 };
93
94 bool m_compress{ true };
95
96 ///@}
97
98 size_t m_circBuffLength{ 1024 }; ///< The length of the circular buffer, in frames
99 double m_circBuffSize{ 2048.0 }; ///< The size of the circular buffer, in MB
100 size_t m_writeChunkLength{ 512 }; ///< The number of frames to write at a time
101
102 size_t m_width{ 0 }; ///< The width of the image
103 size_t m_height{ 0 }; ///< The height of the image
104 uint8_t m_dataType{ 0 }; ///< The ImageStreamIO type code.
105 int m_typeSize{ 0 }; ///< The pixel byte depth
106
107 char *m_rawImageCircBuff{ nullptr };
109
110 size_t m_currImage{ 0 };
111
112 double m_currImageTime{ 0 }; ///< The write-time of the current image
113
114 double m_currChunkStartTime{ 0 }; ///< The write-time of the first image in the chunk
115
116 // Writer book-keeping:
117 int m_writing{ NOT_WRITING }; /**< Controls whether or not images are being written,
118 and sequences start and stop of writing.*/
119
120 uint64_t m_currChunkStart{ 0 }; ///< The circular buffer starting position of the current to-be-written chunk.
121 uint64_t m_nextChunkStart{ 0 }; ///< The circular buffer starting position of the next to-be-written chunk.
122
123 uint64_t m_currSaveStart{ 0 }; ///< The circular buffer position at which to start saving.
124 uint64_t m_currSaveStop{ 0 }; ///< The circular buffer position at which to stop saving.
125
126 uint64_t m_currSaveStopFrameNo{ 0 }; ///< The frame number of the image at which saving stopped (for logging)
127
128 /// The xrif compression handle for image data
129 xrif_t m_xrif{ nullptr };
130
131 /// Storage for the xrif image data file header
132 char *m_xrif_header{ nullptr };
133
134 /// The xrif compression handle for image data
136
137 /// Storage for the xrif image data file header
138 char *m_xrif_timing_header{ nullptr };
139
140 std::string m_outFilePath; ///< The full path for the latest output file
141
142 public:
143 /// Default c'tor
144 streamWriter();
145
146 /// Destructor
148
149 /// Setup the configuration system (called by MagAOXApp::setup())
150 virtual void setupConfig();
151
152 /// load the configuration system results (called by MagAOXApp::setup())
153 virtual void loadConfig();
154
155 /// Startup functions
156 /** Sets up the INDI vars.
157 *
158 */
159 virtual int appStartup();
160
161 /// Implementation of the FSM for the Siglent SDG
162 virtual int appLogic();
163
164 /// Do any needed shutdown tasks. Currently nothing in this app.
165 virtual int appShutdown();
166
167 protected:
168 /** \name SIGSEGV & SIGBUS signal handling
169 * These signals occur as a result of a ImageStreamIO source server resetting (e.g. changing frame sizes).
170 * When they occur a restart of the framegrabber and framewriter thread main loops is triggered.
171 *
172 * @{
173 */
174 bool m_restart{ false };
175
176 static streamWriter *m_selfWriter; ///< Static pointer to this (set in constructor). Used for getting out of the
177 ///< static SIGSEGV handler.
178
179 /// Initialize the xrif system.
180 /** Allocates the handles and headers pointers.
181 *
182 * \returns 0 on success.
183 * \returns -1 on error.
184 */
185 int initialize_xrif();
186
187 /// Sets the handler for SIGSEGV and SIGBUS
188 /** These are caused by ImageStreamIO server resets.
189 */
190 int setSigSegvHandler();
191
192 /// The handler called when SIGSEGV or SIGBUS is received, which will be due to ImageStreamIO server resets. Just a
193 /// wrapper for handlerSigSegv.
194 static void _handlerSigSegv( int signum, siginfo_t *siginf, void *ucont );
195
196 /// Handles SIGSEGV and SIGBUS. Sets m_restart to true.
197 void handlerSigSegv( int signum, siginfo_t *siginf, void *ucont );
198 ///@}
199
200 /** \name Framegrabber Thread
201 * This thread monitors the ImageStreamIO buffer and copies its images to the circular buffer.
202 *
203 * @{
204 */
205 int m_fgThreadPrio{ 1 }; ///< Priority of the framegrabber thread, should normally be > 00.
206
207 std::string m_fgCpuset; ///< The cpuset for the framegrabber thread. Ignored if empty (the default).
208
209 std::thread m_fgThread; ///< A separate thread for the actual framegrabbings
210
211 bool m_fgThreadInit{ true }; ///< Synchronizer to ensure f.g. thread initializes before doing dangerous things.
212
213 pid_t m_fgThreadID{ 0 }; ///< F.g. thread PID.
214
215 pcf::IndiProperty m_fgThreadProp; ///< The property to hold the f.g. thread details.
216
217 public:
218 static void getCircBuffLengths( size_t &circBuffLength,
219 double &circBuffSize,
220 size_t &writeChunkLength,
221 size_t maxCircBuffLength,
222 double maxCircBuffSize,
223 size_t maxWriteChunkLength,
224 uint32_t width,
225 uint32_t height,
226 size_t typeSize );
227
228 protected:
229 /// Worker function to allocate the circular buffers.
230 /** This takes place in the fg thread after connecting to the stream.
231 *
232 * \returns 0 on sucess.
233 * \returns -1 on error.
234 */
235 int allocate_circbufs();
236
237 /// Worker function to configure and allocate the xrif handles.
238 /** This takes place in the fg thread after connecting to the stream.
239 *
240 * \returns 0 on sucess.
241 * \returns -1 on error.
242 */
243 int allocate_xrif();
244
245 /// Thread starter, called by fgThreadStart on thread construction. Calls fgThreadExec.
246 static void fgThreadStart( streamWriter *s /**< [in] a pointer to an streamWriter instance (normally this) */ );
247
248 /// Execute the frame grabber main loop.
249 void fgThreadExec();
250
251 ///@}
252
253 /** \name Stream Writer Thread
254 * This thread writes chunks of the circular buffer to disk.
255 *
256 * @{
257 */
258 int m_swThreadPrio{ 1 }; ///< Priority of the stream writer thread, should normally be > 0, and <= m_fgThreadPrio.
259
260 std::string m_swCpuset; ///< The cpuset for the framegrabber thread. Ignored if empty (the default).
261
262 sem_t m_swSemaphore; ///< Semaphore used to synchronize the fg thread and the sw thread.
263
264 std::thread m_swThread; ///< A separate thread for the actual writing
265
266 bool m_swThreadInit{ true }; ///< Synchronizer to ensure s.w. thread initializes before doing dangerous things.
267
268 pid_t m_swThreadID{ 0 }; ///< S.w. thread pid.
269
270 pcf::IndiProperty m_swThreadProp; ///< The property to hold the s.w. thread details.
271
272 /// Thread starter, called by swThreadStart on thread construction. Calls swThreadExec.
273 static void swThreadStart( streamWriter *s /**< [in] a pointer to an streamWriter instance (normally this) */ );
274
275 /// Execute the stream writer main loop.
276 void swThreadExec();
277
278 /// Function called when semaphore is raised to do the encode and write.
279 int doEncode();
280 ///@}
281
282 // INDI:
283 protected:
284 // declare our properties
285 pcf::IndiProperty m_indiP_writing;
286
287 pcf::IndiProperty m_indiP_xrifStats;
288
289 public:
291
292 void updateINDI();
293
294 /** \name Telemeter Interface
295 *
296 * @{
297 */
298 int checkRecordTimes();
299
300 int recordTelem( const telem_saving_state * );
301
302 int recordSavingState( bool force = false );
303 int recordSavingStats( bool force = false );
304
305 ///@}
306};
307
308// Set self pointer to null so app starts up uninitialized.
310
311streamWriter::streamWriter() : MagAOXApp( MAGAOX_CURRENT_SHA1, MAGAOX_REPO_MODIFIED )
312{
313 m_powerMgtEnabled = false;
314
315 m_selfWriter = this;
316
317 return;
318}
319
321{
322 if( m_xrif )
324
325 if( m_xrif_header )
327
328 if( m_xrif_timing )
330
333
334 return;
335}
336
338{
339 config.add( "writer.savePath",
340 "",
341 "writer.savePath",
342 argType::Required,
343 "writer",
344 "savePath",
345 false,
346 "string",
347 "The absolute path where images are saved. Will use MagAO-X default if not set." );
348
349 config.add( "writer.maxCircBuffLength",
350 "",
351 "writer.maxCircBuffLength",
352 argType::Required,
353 "writer",
354 "maxCircBuffLength",
355 false,
356 "size_t",
357 "The maximum length in frames of the circular buffer. Should be an integer multiple of and larger than "
358 "maxWriteChunkLength." );
359
360 config.add( "writer.maxCircBuffSize",
361 "",
362 "writer.maxCircBuffSize",
363 argType::Required,
364 "writer",
365 "maxCircBuffSize",
366 false,
367 "double",
368 "The maximum size in MB of the circular buffer. Should be sized to hold at least 2 of the maximum "
369 "frame size." );
370
371 config.add(
372 "writer.maxWriteChunkLength",
373 "",
374 "writer.maxWriteChunkLength",
375 argType::Required,
376 "writer",
377 "maxWriteChunkLength",
378 false,
379 "size_t",
380 "The maximum length in frames of the chunks to write to disk. Should be smaller than maxCircBuffLength." );
381
382 config.add( "writer.maxChunkTime",
383 "",
384 "writer.maxChunkTime",
385 argType::Required,
386 "writer",
387 "maxChunkTime",
388 false,
389 "float",
390 "The max length in seconds of the chunks to write to disk. Default is 60 sec." );
391
392 config.add( "writer.threadPrio",
393 "",
394 "writer.threadPrio",
395 argType::Required,
396 "writer",
397 "threadPrio",
398 false,
399 "int",
400 "The real-time priority of the stream writer thread." );
401
402 config.add( "writer.cpuset",
403 "",
404 "writer.cpuset",
405 argType::Required,
406 "writer",
407 "cpuset",
408 false,
409 "int",
410 "The cpuset for the writer thread." );
411
412 config.add( "writer.compress",
413 "",
414 "writer.compress",
415 argType::Required,
416 "writer",
417 "compress",
418 false,
419 "bool",
420 "Flag to set whether compression is used. Default true." );
421
422 config.add( "writer.lz4accel",
423 "",
424 "writer.lz4accel",
425 argType::Required,
426 "writer",
427 "lz4accel",
428 false,
429 "int",
430 "The LZ4 acceleration parameter. Larger is faster, but lower compression." );
431
432 config.add( "writer.outName",
433 "",
434 "writer.outName",
435 argType::Required,
436 "writer",
437 "outName",
438 false,
439 "int",
440 "The name to use for output files. Default is the shmimName." );
441
442 config.add( "framegrabber.shmimName",
443 "",
444 "framegrabber.shmimName",
445 argType::Required,
446 "framegrabber",
447 "shmimName",
448 false,
449 "int",
450 "The name of the stream to monitor. From /tmp/shmimName.im.shm." );
451
452 config.add( "framegrabber.semaphoreNumber",
453 "",
454 "framegrabber.semaphoreNumber",
455 argType::Required,
456 "framegrabber",
457 "semaphoreNumber",
458 false,
459 "int",
460 "The semaphore to wait on. Default is 7." );
461
462 config.add( "framegrabber.semWait",
463 "",
464 "framegrabber.semWait",
465 argType::Required,
466 "framegrabber",
467 "semWait",
468 false,
469 "int",
470 "The time in nsec to wait on the semaphore. Max is 999999999. Default is 5e8 nsec." );
471
472 config.add( "framegrabber.threadPrio",
473 "",
474 "framegrabber.threadPrio",
475 argType::Required,
476 "framegrabber",
477 "threadPrio",
478 false,
479 "int",
480 "The real-time priority of the framegrabber thread." );
481
482 config.add( "framegrabber.cpuset",
483 "",
484 "framegrabber.cpuset",
485 argType::Required,
486 "framegrabber",
487 "cpuset",
488 false,
489 "string",
490 "The cpuset for the framegrabber thread." );
491
492 telemeterT::setupConfig( config );
493}
494
496{
497
498 config( m_maxCircBuffLength, "writer.maxCircBuffLength" );
499 config( m_maxCircBuffSize, "writer.maxCircBuffSize" );
500 config( m_maxWriteChunkLength, "writer.maxWriteChunkLength" );
501 config( m_maxChunkTime, "writer.maxChunkTime" );
502 config( m_swThreadPrio, "writer.threadPrio" );
503 config( m_swCpuset, "writer.cpuset" );
504 config( m_compress, "writer.compress" );
505 config( m_lz4accel, "writer.lz4accel" );
507 {
509 }
511 {
513 }
514
515 config( m_shmimName, "framegrabber.shmimName" );
516
518 config( m_outName, "writer.outName" );
519
520 config( m_semaphoreNumber, "framegrabber.semaphoreNumber" );
521 config( m_semWaitNSec, "framegrabber.semWait" );
522
523 config( m_fgThreadPrio, "framegrabber.threadPrio" );
524 config( m_fgCpuset, "framegrabber.cpuset" );
525
526 // Set some defaults
527 // Setup default saving path
528 std::string tmpstr = mx::sys::getEnv( MAGAOX_env_rawimage );
529 if( tmpstr == "" )
530 {
532 }
533 m_rawimageDir = basePath() + "/" + tmpstr + "/" + m_outName;
534
535 config( m_rawimageDir, "writer.savePath" );
536
537 if( telemeterT::loadConfig( config ) < 0 )
538 {
539 log<text_log>( "Error during telemeter config", logPrio::LOG_CRITICAL );
540 m_shutdown = true;
541 }
542}
543
545{
546 // Create save directory.
547 errno = 0;
548 if( mkdir( m_rawimageDir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH ) < 0 )
549 {
550 if( errno != EEXIST )
551 {
552 std::stringstream logss;
553 logss << "Failed to create image directory (" << m_rawimageDir << "). Errno says: " << strerror( errno );
555
556 return -1;
557 }
558 }
559
560 // set up the INDI properties
563
564 // Register the stats INDI property
565 REG_INDI_NEWPROP_NOCB( m_indiP_xrifStats, "xrif", pcf::IndiProperty::Number );
566 m_indiP_xrifStats.setLabel( "xrif compression performance" );
567
568 indi::addNumberElement<float>( m_indiP_xrifStats, "ratio", 0, 1.0, 0.0, "%0.2f", "Compression Ratio" );
569
570 indi::addNumberElement<float>( m_indiP_xrifStats,
571 "differenceMBsec",
572 0,
573 std::numeric_limits<float>::max(),
574 0.0,
575 "%0.2f",
576 "Differencing Rate [MB/sec]" );
577
578 indi::addNumberElement<float>( m_indiP_xrifStats,
579 "reorderMBsec",
580 0,
581 std::numeric_limits<float>::max(),
582 0.0,
583 "%0.2f",
584 "Reordering Rate [MB/sec]" );
585
586 indi::addNumberElement<float>( m_indiP_xrifStats,
587 "compressMBsec",
588 0,
589 std::numeric_limits<float>::max(),
590 0.0,
591 "%0.2f",
592 "Compression Rate [MB/sec]" );
593
594 indi::addNumberElement<float>( m_indiP_xrifStats,
595 "encodeMBsec",
596 0,
597 std::numeric_limits<float>::max(),
598 0.0,
599 "%0.2f",
600 "Total Encoding Rate [MB/sec]" );
601
602 indi::addNumberElement<float>( m_indiP_xrifStats,
603 "differenceFPS",
604 0,
605 std::numeric_limits<float>::max(),
606 0.0,
607 "%0.2f",
608 "Differencing Rate [f.p.s.]" );
609
610 indi::addNumberElement<float>( m_indiP_xrifStats,
611 "reorderFPS",
612 0,
613 std::numeric_limits<float>::max(),
614 0.0,
615 "%0.2f",
616 "Reordering Rate [f.p.s.]" );
617
618 indi::addNumberElement<float>( m_indiP_xrifStats,
619 "compressFPS",
620 0,
621 std::numeric_limits<float>::max(),
622 0.0,
623 "%0.2f",
624 "Compression Rate [f.p.s.]" );
625
626 indi::addNumberElement<float>( m_indiP_xrifStats,
627 "encodeFPS",
628 0,
629 std::numeric_limits<float>::max(),
630 0.0,
631 "%0.2f",
632 "Total Encoding Rate [f.p.s.]" );
633
634 // Now set up the framegrabber and writer threads.
635 // - need SIGSEGV and SIGBUS handling for ImageStreamIO restarts
636 // - initialize the semaphore
637 // - start the threads
638
639 if( setSigSegvHandler() < 0 )
640 return log<software_error, -1>( { __FILE__, __LINE__ } );
641
642 if( sem_init( &m_swSemaphore, 0, 0 ) < 0 )
643 return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "Initializing S.W. semaphore" } );
644
645 // Check if we have a safe writeChunkLengthh
647 {
648 return log<software_critical, -1>(
649 { __FILE__, __LINE__, "Write chunk length is not a divisor of circular buffer length." } );
650 }
651
652 if( initialize_xrif() < 0 )
653 {
655 }
656
663 "framegrabber",
664 this,
665 fgThreadStart ) < 0 )
666 {
667 return log<software_critical, -1>( { __FILE__, __LINE__ } );
668 }
669
676 "streamwriter",
677 this,
678 swThreadStart ) < 0 )
679 {
681 }
682
683 if( telemeterT::appStartup() < 0 )
684 {
685 return log<software_error, -1>( { __FILE__, __LINE__ } );
686 }
687
688 return 0;
689}
690
692{
693
694 // first do a join check to see if other threads have exited.
695 // these will throw if the threads are really gone
696 try
697 {
698 if( pthread_tryjoin_np( m_fgThread.native_handle(), 0 ) == 0 )
699 {
700 log<software_error>( { __FILE__, __LINE__, "framegrabber thread has exited" } );
701 return -1;
702 }
703 }
704 catch( ... )
705 {
706 log<software_error>( { __FILE__, __LINE__, "framegrabber thread has exited" } );
707 return -1;
708 }
709
710 try
711 {
712 if( pthread_tryjoin_np( m_swThread.native_handle(), 0 ) == 0 )
713 {
714 log<software_error>( { __FILE__, __LINE__, "stream thread has exited" } );
715 return -1;
716 }
717 }
718 catch( ... )
719 {
720 log<software_error>( { __FILE__, __LINE__, "streamwriter thread has exited" } );
721 return -1;
722 }
723
724 switch( m_writing )
725 {
726 case NOT_WRITING:
728 break;
729 default:
731 }
732
734 {
735 if( telemeterT::appLogic() < 0 )
736 {
738 return 0;
739 }
740 }
741
742 updateINDI();
743
744 return 0;
745}
746
748{
750 updateINDI();
751
752 try
753 {
754 if( m_fgThread.joinable() )
755 {
756 m_fgThread.join();
757 }
758 }
759 catch( ... )
760 {
761 }
762
763 try
764 {
765 if( m_swThread.joinable() )
766 {
767 m_swThread.join();
768 }
769 }
770 catch( ... )
771 {
772 }
773
774 if( m_xrif )
775 {
777 m_xrif = nullptr;
778 }
779
780 if( m_xrif_timing )
781 {
783 m_xrif_timing = nullptr;
784 }
785
787
788 return 0;
789}
790
792{
794 if( rv != XRIF_NOERROR )
795 {
796 return log<software_critical, -1>(
797 { __FILE__, __LINE__, 0, rv, "xrif handle allocation or initialization error." } );
798 }
799
800 if( m_compress )
801 {
803 if( rv != XRIF_NOERROR )
804 {
805 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
806 }
807 }
808 else
809 {
810 std::cerr << "not compressing . . . \n";
812 if( rv != XRIF_NOERROR )
813 {
814 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
815 }
816 }
817
818 errno = 0;
819
820 m_xrif_header = reinterpret_cast<char *>( malloc( XRIF_HEADER_SIZE * sizeof( char ) ) );
821 if( m_xrif_header == NULL )
822 {
823 return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "xrif header allocation failed." } );
824 }
825
827 if( rv != XRIF_NOERROR )
828 {
829 return log<software_critical, -1>(
830 { __FILE__, __LINE__, 0, rv, "xrif handle allocation or initialization error." } );
831 }
832
833 // m_xrif_timing->reorder_method = XRIF_REORDER_NONE;
835 if( rv != XRIF_NOERROR )
836 {
837 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
838 }
839
840 errno = 0;
841
842 m_xrif_timing_header = reinterpret_cast<char *>( malloc( XRIF_HEADER_SIZE * sizeof( char ) ) );
844 {
845 return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "xrif header allocation failed." } );
846 }
847
848 return 0;
849}
850
852{
853 struct sigaction act;
854 sigset_t set;
855
856 act.sa_sigaction = &streamWriter::_handlerSigSegv;
857 act.sa_flags = SA_SIGINFO;
858 sigemptyset( &set );
859 act.sa_mask = set;
860
861 errno = 0;
862 if( sigaction( SIGSEGV, &act, 0 ) < 0 )
863 {
864 std::string logss = "Setting handler for SIGSEGV failed. Errno says: ";
865 logss += strerror( errno );
866
868
869 return -1;
870 }
871
872 errno = 0;
873 if( sigaction( SIGBUS, &act, 0 ) < 0 )
874 {
875 std::string logss = "Setting handler for SIGBUS failed. Errno says: ";
876 logss += strerror( errno );
877
879
880 return -1;
881 }
882
883 log<text_log>( "Installed SIGSEGV/SIGBUS signal handler.", logPrio::LOG_DEBUG );
884
885 return 0;
886}
887
888void streamWriter::_handlerSigSegv( int signum, siginfo_t *siginf, void *ucont )
889{
891}
892
893void streamWriter::handlerSigSegv( int signum, siginfo_t *siginf, void *ucont )
894{
895 static_cast<void>( signum );
896 static_cast<void>( siginf );
897 static_cast<void>( ucont );
898
899 m_restart = true;
900
901 return;
902}
903
904void streamWriter::getCircBuffLengths( size_t &circBuffLength,
905 double &circBuffSize,
906 size_t &writeChunkLength,
907 size_t maxCircBuffLength,
908 double maxCircBuffSize,
909 size_t maxWriteChunkLength,
910 uint32_t width,
911 uint32_t height,
912 size_t typeSize )
913{
914 static constexpr double MB = 1048576.0;
915
916 size_t isz = width * height * typeSize * maxCircBuffLength;
917
918 if( isz <= maxCircBuffSize * MB )
919 {
921 circBuffSize = isz / MB;
923
924 return;
925 }
926
927 circBuffLength = maxCircBuffSize * MB / ( width * height * typeSize );
928
929 if( circBuffLength % 2 == 1 )
930 {
932 }
933
934 circBuffSize = ( width * height * typeSize * circBuffLength ) / MB;
935
937
938 if( circBuffLength == 0 )
939 {
940 return;
941 }
942
943 if( writeChunkLength == 0 )
944 {
946 }
947
948 while( circBuffLength % writeChunkLength != 0 )
949 {
951 }
952
953 return;
954}
955
957{
958
965 m_width,
966 m_height,
967 m_typeSize );
968
969 if( m_circBuffLength < 2 )
970 {
971 return log<software_critical, -1>( { __FILE__, __LINE__, "frame size too large to fit in maxCircBuffSize" } );
972 }
973
975 {
976 return log<software_critical, -1>(
977 { __FILE__, __LINE__, "writeChunkLength is not smaller than circBuffLength" } );
978 }
979
981 {
982 return log<software_critical, -1>(
983 { __FILE__, __LINE__, "writeChunkLength is not an integer factor of circBuffLength" } );
984 }
985
986 std::string msg = "Set circ buff length: " + std::to_string( m_circBuffLength ) + " frames (";
987 msg += std::to_string( m_circBuffSize ) + " MB). Write chunk length: " + std::to_string( m_writeChunkLength );
988 msg += " frames.";
989
991
993 {
995 }
996
997 errno = 0;
998 m_rawImageCircBuff = reinterpret_cast<char *>( malloc( m_width * m_height * m_typeSize * m_circBuffLength ) );
999
1000 if( m_rawImageCircBuff == NULL )
1001 {
1002 return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "buffer allocation failure" } );
1003 }
1004
1005 if( m_timingCircBuff )
1006 {
1008 }
1009
1010 errno = 0;
1011 m_timingCircBuff = reinterpret_cast<uint64_t *>( malloc( 5 * sizeof( uint64_t ) * m_circBuffLength ) );
1012 if( m_timingCircBuff == NULL )
1013 {
1014 return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "buffer allocation failure" } );
1015 }
1016
1017 return 0;
1018}
1019
1021{
1022 // Set up the image data xrif handle
1024
1025 if( m_compress )
1026 {
1028 if( rv != XRIF_NOERROR )
1029 {
1030 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
1031 }
1032 }
1033 else
1034 {
1035 std::cerr << "not compressing . . . \n";
1037 if( rv != XRIF_NOERROR )
1038 {
1039 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
1040 }
1041 }
1042
1044 if( rv != XRIF_NOERROR )
1045 {
1046 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_set_size error." } );
1047 }
1048
1050 if( rv != XRIF_NOERROR )
1051 {
1052 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_raw error." } );
1053 }
1054
1056 if( rv != XRIF_NOERROR )
1057 {
1058 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_reordered error." } );
1059 }
1060
1061 // Set up the timing data xrif handle
1063 if( rv != XRIF_NOERROR )
1064 {
1065 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
1066 }
1067
1069 if( rv != XRIF_NOERROR )
1070 {
1071 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_set_size error." } );
1072 }
1073
1075 if( rv != XRIF_NOERROR )
1076 {
1077 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_raw error." } );
1078 }
1079
1081 if( rv != XRIF_NOERROR )
1082 {
1083 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_reordered error." } );
1084 }
1085
1086 return 0;
1087}
1088
1090{
1091 o->fgThreadExec();
1092}
1093
1095{
1097
1098 // Wait fpr the thread starter to finish initializing this thread.
1099 while( m_fgThreadInit == true && m_shutdown == 0 )
1100 {
1101 sleep( 1 );
1102 }
1103
1104 timespec missing_ts;
1105
1106 IMAGE image;
1107 ino_t inode = 0; // The inode of the image stream file
1108
1109 bool opened = false;
1110
1111 while( m_shutdown == 0 )
1112 {
1113 /* Initialize ImageStreamIO
1114 */
1115 opened = false;
1116 m_restart = false; // Set this up front, since we're about to restart.
1117
1118 sem_t *sem{ nullptr }; ///< The semaphore to monitor for new image data
1119
1120 int logged = 0;
1121 while( !opened && !m_shutdown && !m_restart )
1122 {
1123 // b/c ImageStreamIO prints every single time, and latest version don't support stopping it yet,
1124 // and that isn't thread-safe-able anyway
1125 // we do our own checks. This is the same code in ImageStreamIO_openIm...
1126 int SM_fd;
1127 char SM_fname[200];
1128 ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), m_shmimName.c_str() );
1129 SM_fd = open( SM_fname, O_RDWR );
1130 if( SM_fd == -1 )
1131 {
1132 if( !logged )
1133 {
1134 log<text_log>( "ImageStream " + m_shmimName + " not found (yet). Retrying . . .",
1136 }
1137 logged = 1;
1138 sleep( 1 ); // be patient
1139 continue;
1140 }
1141
1142 // Found and opened, close it and then use ImageStreamIO
1143 logged = 0;
1144 close( SM_fd );
1145
1146 if( ImageStreamIO_openIm( &image, m_shmimName.c_str() ) == 0 )
1147 {
1148 if( image.md[0].sem <=
1149 m_semaphoreNumber ) ///<\todo this isn't right--> isn't there a define in cacao to use?
1150 {
1152 mx::sys::sleep( 1 ); // We just need to wait for the server process to finish startup.
1153 }
1154 else
1155 {
1156 opened = true;
1157
1158 char SM_fname[200];
1159 ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), m_shmimName.c_str() );
1160
1161 struct stat buffer;
1162 int rv = stat( SM_fname, &buffer );
1163
1164 if( rv != 0 )
1165 {
1167 __LINE__,
1168 errno,
1169 "Could not get inode for " + m_shmimName +
1170 ". Source process will need to be restarted." } );
1172 return;
1173 }
1174 inode = buffer.st_ino;
1175 }
1176 }
1177 else
1178 {
1179 mx::sys::sleep( 1 ); // be patient
1180 }
1181 }
1182
1183 if( m_restart )
1184 continue; // this is kinda dumb. we just go around on restart, so why test in the while loop at all?
1185
1186 if( m_shutdown || !opened )
1187 {
1188 if( !opened )
1189 return;
1190
1192 return;
1193 }
1194
1195 // now get a good semaphore
1197 ImageStreamIO_getsemwaitindex( &image, m_semaphoreNumber ); // ask for semaphore we had before
1198
1199 if( m_semaphoreNumber < 0 )
1200 {
1202 { __FILE__,
1203 __LINE__,
1204 "No valid semaphore found for " + m_shmimName + ". Source process will need to be restarted." } );
1205 return;
1206 }
1207
1209 __LINE__,
1210 "got semaphore index " + std::to_string( m_semaphoreNumber ) + " for " + m_shmimName } );
1211
1213
1214 sem = image.semptr[m_semaphoreNumber]; ///< The semaphore to monitor for new image data
1215
1216 m_dataType = image.md[0].datatype;
1218 m_width = image.md[0].size[0];
1219 m_height = image.md[0].size[1];
1220 size_t length;
1221 if( image.md[0].naxis == 3 )
1222 {
1223 length = image.md[0].size[2];
1224 }
1225 else
1226 {
1227 length = 1;
1228 }
1229 std::cerr << "connected"
1230 << " " << m_width << "x" << m_height << "x" << (int)m_dataType << " (" << m_typeSize << ")"
1231 << std::endl;
1232
1233 // Now allocate the circBuffs
1234 if( allocate_circbufs() < 0 )
1235 {
1236 return; // will cause shutdown!
1237 }
1238
1239 // And allocate the xrifs
1240 if( allocate_xrif() < 0 )
1241 {
1242 return; // Will cause shutdown!
1243 }
1244
1245 uint8_t atype;
1246 size_t snx, sny, snz;
1247
1248 uint64_t curr_image; // The current cnt1 index
1249 m_currImage = 0;
1250 m_currChunkStart = 0;
1251 m_nextChunkStart = 0;
1252
1253 // Initialized curr_image ...
1254 if( image.md[0].naxis > 2 )
1255 {
1256 curr_image = image.md[0].cnt1;
1257 }
1258 else
1259 {
1260 curr_image = 0;
1261 }
1262
1263 uint64_t last_cnt0; // = ((uint64_t)-1);
1264
1265 // so we can initialize last_cnt0 to avoid frame skip on startup
1266 if( image.cntarray )
1267 {
1268 last_cnt0 = image.cntarray[curr_image];
1269 }
1270 else
1271 {
1272 last_cnt0 = image.md[0].cnt0;
1273 }
1274
1275 int cnt0flag = 0;
1276
1277 bool restartWriting = false; // flag to prevent logging on a logging restart
1278
1279 // This is the main image grabbing loop.
1280 while( !m_shutdown && !m_restart )
1281 {
1282 timespec ts;
1284
1285 if( sem_timedwait( sem, &ts ) == 0 )
1286 {
1287 if( image.md[0].naxis > 2 )
1288 {
1289 curr_image = image.md[0].cnt1;
1290 }
1291 else
1292 {
1293 curr_image = 0;
1294 }
1295
1296 atype = image.md[0].datatype;
1297 snx = image.md[0].size[0];
1298 sny = image.md[0].size[1];
1299 if( image.md[0].naxis == 3 )
1300 {
1301 snz = image.md[0].size[2];
1302 }
1303 else
1304 {
1305 snz = 1;
1306 }
1307
1308 if( atype != m_dataType || snx != m_width || sny != m_height || snz != length )
1309 {
1310 break; // exit the nearest while loop and get the new image setup.
1311 }
1312
1313 if( m_shutdown || m_restart )
1314 {
1315 break; // Check for exit signals
1316 }
1317
1319 if( image.cntarray )
1320 {
1321 new_cnt0 = image.cntarray[curr_image];
1322 }
1323 else
1324 {
1325 new_cnt0 = image.md[0].cnt0;
1326 }
1327
1328#ifdef SW_DEBUG
1329 std::cerr << "new_cnt0: " << new_cnt0 << "\n";
1330#endif
1331
1332 ///\todo cleanup skip frame handling.
1333 if( new_cnt0 == last_cnt0 ) //<- this probably isn't useful really
1334 {
1335 log<text_log>( "semaphore raised but cnt0 has not changed -- we're probably getting behind",
1337 ++cnt0flag;
1338 if( cnt0flag > 10 )
1339 {
1340 m_restart = true; // if we get here 10 times then something else is wrong.
1341 }
1342 continue;
1343 }
1344
1345 if( new_cnt0 - last_cnt0 > 1 ) //<- this is what we want to check.
1346 {
1347 log<text_log>( "cnt0 changed by more than 1. Frame skipped.", logPrio::LOG_WARNING );
1348 }
1349
1350 cnt0flag = 0;
1351
1353
1355 char *curr_src =
1356 reinterpret_cast<char *>( image.array.raw ) + curr_image * m_width * m_height * m_typeSize;
1357
1359
1361
1362 if( image.cntarray )
1363 {
1364 curr_timing[0] = image.cntarray[curr_image];
1365 curr_timing[1] = image.atimearray[curr_image].tv_sec;
1366 curr_timing[2] = image.atimearray[curr_image].tv_nsec;
1367 curr_timing[3] = image.writetimearray[curr_image].tv_sec;
1368 curr_timing[4] = image.writetimearray[curr_image].tv_nsec;
1369 }
1370 else
1371 {
1372 curr_timing[0] = image.md[0].cnt0;
1373 curr_timing[1] = image.md[0].atime.tv_sec;
1374 curr_timing[2] = image.md[0].atime.tv_nsec;
1375 curr_timing[3] = image.md[0].writetime.tv_sec;
1376 curr_timing[4] = image.md[0].writetime.tv_nsec;
1377 }
1378
1379 // Check if we need to time-stamp ourselves -- for old cacao streams
1380 if( curr_timing[1] == 0 )
1381 {
1382
1384 {
1385 log<software_critical>( { __FILE__, __LINE__, errno, 0, "clock_gettime" } );
1386 return;
1387 }
1388
1389 curr_timing[1] = missing_ts.tv_sec;
1390 curr_timing[2] = missing_ts.tv_nsec;
1391 }
1392
1393 // just set w-time to a-time if it's missing
1394 if( curr_timing[3] == 0 )
1395 {
1396 curr_timing[3] = curr_timing[1];
1397 curr_timing[4] = curr_timing[2];
1398 }
1399
1400 m_currImageTime = 1.0 * curr_timing[3] + ( 1.0 * curr_timing[4] ) / 1e9;
1401
1402 if( m_shutdown && m_writing == WRITING )
1403 {
1405 }
1406
1407 switch( m_writing )
1408 {
1409 case START_WRITING:
1410
1414
1415 if( !restartWriting ) // We only log if this is really a start
1416 {
1417 log<saving_start>( { 1, new_cnt0 } );
1418 }
1419 else // on a restart after a timeout we don't log
1420 {
1421 restartWriting = false;
1422 }
1423
1425
1426 // fall through
1427 case WRITING:
1429 {
1433
1434#ifdef SW_DEBUG
1435 std::cerr << __FILE__ << " " << __LINE__ << " WRITING " << m_currImage << " "
1436 << m_nextChunkStart << " "
1437 << ( m_currImage - m_nextChunkStart == m_writeChunkLength - 1 ) << " "
1439 << "\n";
1440#endif
1441
1442 // Now tell the writer to get going
1443 if( sem_post( &m_swSemaphore ) < 0 )
1444 {
1445 log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1446 return;
1447 }
1448
1451 {
1452 m_nextChunkStart = 0;
1453 }
1454
1457 }
1459 {
1463
1464#ifdef SW_DEBUG
1465 std::cerr << __FILE__ << " " << __LINE__ << " IMAGE TIME WRITING " << m_currImage << " "
1466 << m_nextChunkStart << " "
1467 << ( m_currImage - m_nextChunkStart == m_writeChunkLength - 1 ) << " "
1469 << "\n";
1470#endif
1471
1472 // Now tell the writer to get going
1473 if( sem_post( &m_swSemaphore ) < 0 )
1474 {
1475 log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1476 return;
1477 }
1478
1480 restartWriting = true;
1481 }
1482 break;
1483
1484 case STOP_WRITING:
1488
1489#ifdef SW_DEBUG
1490 std::cerr << __FILE__ << " " << __LINE__ << " STOP_WRITING\n";
1491#endif
1492
1493 // Now tell the writer to get going
1494 if( sem_post( &m_swSemaphore ) < 0 )
1495 {
1496 log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1497 return;
1498 }
1499 restartWriting = false;
1500 break;
1501
1502 default:
1503 break;
1504 }
1505
1506 ++m_currImage;
1508 {
1509 m_currImage = 0;
1510 }
1511 }
1512 else
1513 {
1514 // If semaphore times-out or errors, we first cleanup any writing that needs to be done
1515 // we can also get here if a signal interrupts the sem wait which is triggered by INDI callbacks
1516 switch( m_writing )
1517 {
1518 case WRITING:
1519 // Here, if there is at least 1 image, we check for delta-time > m_maxChunkTime
1520 // then write
1521 if( ( m_currImage - m_nextChunkStart > 0 ) &&
1522 ( mx::sys::get_curr_time() - m_currChunkStartTime > m_maxChunkTime ) )
1523 {
1527
1528#ifdef SW_DEBUG
1529 std::cerr << __FILE__ << " " << __LINE__ << " TIMEOUT WRITING " << " " << m_currImage << " "
1530 << m_nextChunkStart << " " << ( m_currImage - m_nextChunkStart ) << " " << last_cnt0
1531 << "\n";
1532#endif
1533
1534 // Now tell the writer to get going
1535 if( sem_post( &m_swSemaphore ) < 0 )
1536 {
1537 log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1538 return;
1539 }
1540
1542 restartWriting = true;
1543 }
1544 break;
1545 case STOP_WRITING:
1546 // If we timed-out while STOP_WRITING is set, we trigger a write.
1550
1551#ifdef SW_DEBUG
1552 std::cerr << __FILE__ << " " << __LINE__ << " TIMEOUT STOP_WRITING\n";
1553#endif
1554
1555 // Now tell the writer to get going
1556 if( sem_post( &m_swSemaphore ) < 0 )
1557 {
1558 log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1559 return;
1560 }
1561 restartWriting = false;
1562 break;
1563 default:
1564 break;
1565 }
1566
1567 if( image.md[0].sem <= 0 )
1568 {
1569 break; // Indicates that the server has cleaned up.
1570 }
1571
1572 // Check for why we timed out
1573 if( errno == EINTR )
1574 {
1575 break; // This will indicate time to shutdown, loop will exit normally flags set.
1576 }
1577
1578 // ETIMEDOUT just means we should wait more.
1579 // Otherwise, report an error.
1580 if( errno != ETIMEDOUT )
1581 {
1582 log<software_error>( { __FILE__, __LINE__, errno, "sem_timedwait" } );
1583 break;
1584 }
1585
1586 // Check if the file has disappeared.
1587 int SM_fd;
1588 char SM_fname[200];
1589 ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), m_shmimName.c_str() );
1590 SM_fd = open( SM_fname, O_RDWR );
1591 if( SM_fd == -1 )
1592 {
1593 m_restart = true;
1594 }
1595 close( SM_fd );
1596
1597 // Check if the inode changed
1598 struct stat buffer;
1599 int rv = stat( SM_fname, &buffer );
1600 if( rv != 0 )
1601 {
1602 m_restart = true;
1603 }
1604
1605 if( buffer.st_ino != inode )
1606 {
1607#ifdef SW_DEBUG
1608 std::cerr << "Restarting due to inode . . . \n";
1609#endif
1610 m_restart = true;
1611 }
1612 }
1613 }
1614
1615 ///\todo might still be writing here, so must check
1616 // If semaphore times-out or errors, we first cleanup any writing that needs to be done
1618 {
1619 // Here, if there is at least 1 image, then write
1620 if( ( m_currImage - m_nextChunkStart > 0 ) )
1621 {
1625
1627
1628 std::cerr << __FILE__ << " " << __LINE__ << " WRITING ON RESTART " << last_cnt0 << "\n";
1629 // Now tell the writer to get going
1630 if( sem_post( &m_swSemaphore ) < 0 )
1631 {
1632 log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1633 return;
1634 }
1635 }
1636 else
1637 {
1639 }
1640
1641 while( m_writing != NOT_WRITING )
1642 {
1643 std::cerr << __FILE__ << " " << __LINE__ << " WAITING TO FINISH WRITING " << last_cnt0 << "\n";
1644 sleep( 1 );
1645 }
1646 }
1647
1648 if( m_rawImageCircBuff )
1649 {
1652 }
1653
1654 if( m_timingCircBuff )
1655 {
1657 m_timingCircBuff = 0;
1658 }
1659
1660 if( opened )
1661 {
1662 if( m_semaphoreNumber >= 0 )
1663 {
1664 ///\todo is this release necessary with closeIM?
1665 image.semReadPID[m_semaphoreNumber] = 0; // release semaphore
1666 }
1668 opened = false;
1669 }
1670
1671 } // outer loop, will exit if m_shutdown==true
1672
1673 // One more check
1674 if( m_rawImageCircBuff )
1675 {
1678 }
1679
1680 if( m_timingCircBuff )
1681 {
1683 m_timingCircBuff = 0;
1684 }
1685
1686 if( opened )
1687 {
1688 if( m_semaphoreNumber >= 0 )
1689 {
1690 ///\todo is this release necessary with closeIM?
1691 image.semReadPID[m_semaphoreNumber] = 0; // release semaphore.
1692 }
1693
1695 }
1696}
1697
1699{
1700 s->swThreadExec();
1701}
1702
1704{
1706
1707 // Wait fpr the thread starter to finish initializing this thread.
1708 while( m_swThreadInit == true && m_shutdown == 0 )
1709 {
1710 sleep( 1 );
1711 }
1712
1713 while( !m_shutdown )
1714 {
1715 while( !shutdown() && ( !( state() == stateCodes::READY || state() == stateCodes::OPERATING ) ) )
1716 {
1717 sleep( 1 );
1718 }
1719
1720 if( shutdown() )
1721 {
1722 break;
1723 }
1724
1725 timespec ts;
1726
1727 if( clock_gettime( CLOCK_REALTIME, &ts ) < 0 )
1728 {
1729 log<software_critical>( { __FILE__, __LINE__, errno, 0, "clock_gettime" } );
1730 return; // will trigger a shutdown
1731 }
1732
1733 mx::sys::timespecAddNsec( ts, m_semWaitNSec );
1734
1735 if( sem_timedwait( &m_swSemaphore, &ts ) == 0 )
1736 {
1737 if( doEncode() < 0 )
1738 {
1739 log<software_critical>( { __FILE__, __LINE__, "error encoding data" } );
1740 return;
1741 }
1742 // Otherwise, success, and we just go on.
1743 }
1744 else
1745 {
1746 // Check for why we timed out
1747 if( errno == EINTR )
1748 {
1749 continue; // This will probably indicate time to shutdown, loop will exit normally if flags set.
1750 }
1751
1752 // ETIMEDOUT just means we should wait more.
1753 // Otherwise, report an error.
1754 if( errno != ETIMEDOUT )
1755 {
1756 log<software_error>( { __FILE__, __LINE__, errno, "sem_timedwait" } );
1757 break;
1758 }
1759 }
1760 } // outer loop, will exit if m_shutdown==true
1761}
1762
1764{
1765 if( m_writing == NOT_WRITING )
1766 {
1767 return 0;
1768 }
1769
1770 recordSavingState( true );
1771
1772 // Record these to prevent a change in other thread
1775 size_t nFrames = m_currSaveStop - saveStart;
1776 size_t nBytes = m_width * m_height * m_typeSize;
1777
1778#ifdef SW_DEBUG
1779 std::cerr << "nFrames: " << nFrames << "\n";
1780#endif
1781
1782 if( nFrames == 0 ) // can happend during a stop. just clean up but don't try to write nothting.
1783 {
1784#ifdef SW_DEBUG
1785 std::cerr << "nothing to write\n";
1786#endif
1787
1788 recordSavingStats( true );
1789
1790 if( m_writing == STOP_WRITING )
1791 {
1794 }
1795
1796 recordSavingState( true );
1797
1798 return 0;
1799 }
1800 // Configure xrif and copy image data -- this does no allocations
1802 if( rv != XRIF_NOERROR )
1803 {
1804 // This is a big problem. Report it as "ALERT" and go on.
1805 log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif set size error. DATA POSSIBLY LOST" } );
1806 }
1807
1809 if( rv != XRIF_NOERROR )
1810 {
1811 // This may just be out of range, it's only an error.
1812 log<software_error>( { __FILE__, __LINE__, 0, rv, "xrif set LZ4 acceleration error." } );
1813 }
1814
1816
1817 // Configure xrif and copy timing data -- no allocations
1819 if( rv != XRIF_NOERROR )
1820 {
1821 // This is a big problem. Report it as "ALERT" and go on.
1822 log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif set size error. DATA POSSIBLY LOST." } );
1823 }
1824
1826 if( rv != XRIF_NOERROR )
1827 {
1828 // This may just be out of range, it's only an error.
1829 log<software_error>( { __FILE__, __LINE__, 0, rv, "xrif set LZ4 acceleration error." } );
1830 }
1831
1832#ifdef SW_DEBUG
1833 for( size_t nF = 0; nF < nFrames; ++nF )
1834 {
1835 std::cerr << " " << ( m_timingCircBuff + saveStart * 5 + nF * 5 )[0] << "\n";
1836 }
1837#endif
1838
1839 memcpy( m_xrif_timing->raw_buffer, m_timingCircBuff + saveStart * 5, nFrames * 5 * sizeof( uint64_t ) );
1840
1841 rv = xrif_encode( m_xrif );
1842 if( rv != XRIF_NOERROR )
1843 {
1844 // This is a big problem. Report it as "ALERT" and go on.
1845 log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif encode error. DATA POSSIBLY LOST." } );
1846 }
1847
1849 if( rv != XRIF_NOERROR )
1850 {
1851 // This is a big problem. Report it as "ALERT" and go on.
1852 log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif write header error. DATA POSSIBLY LOST." } );
1853 }
1854
1856 if( rv != XRIF_NOERROR )
1857 {
1858 // This is a big problem. Report it as "ALERT" and go on.
1859 log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif encode error. DATA POSSIBLY LOST." } );
1860 }
1861
1863 if( rv != XRIF_NOERROR )
1864 {
1865 // This is a big problem. Report it as "ALERT" and go on.
1866 log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif write header error. DATA POSSIBLY LOST" } );
1867 }
1868
1869 // Now break down the acq time of the first image in the buffer for use in file name
1870 // tm uttime; // The broken down time.
1871 timespec *fts = reinterpret_cast<timespec *>( m_timingCircBuff + saveStart * 5 + 1 );
1872
1873 std::string fileName;
1874 std::string relPath;
1875 mx::error_t errc = file::fileTimeRelPath( fileName, relPath, m_outName, "xrif", fts->tv_sec, fts->tv_nsec );
1876 if(errc != mx::error_t::noerror)
1877 {
1878 std::string msg = "error from file::fileTimeRePath: ";
1879 msg += mx::errorMessage(errc);
1880 msg += " (" + std::string(mx::errorName(errc)) + ")";
1881 return log<software_error, -1>( { __FILE__, __LINE__, msg } );
1882 }
1883
1884 std::string fullPath = m_rawimageDir + '/' + relPath;
1885
1886 try
1887 {
1888 std::filesystem::create_directories( fullPath ); // this does nothing if fname already exists
1889 }
1890 catch( const std::filesystem::filesystem_error &e )
1891 {
1892 std::string msg = "filesystem_error from std::create_directories. ";
1893 msg += e.what();
1894 msg += " code: ";
1895 msg += e.code().value();
1896 return log<software_critical, -1>( { __FILE__, __LINE__, msg } );
1897 }
1898 catch( const std::exception &e )
1899 {
1900 std::string msg = "exception from std::create_directories. ";
1901 msg += e.what();
1902 return log<software_critical, -1>( { __FILE__, __LINE__, msg } );
1903 }
1904
1906 FILE *fp_xrif = fopen( m_outFilePath.c_str(), "wb" );
1907 if( fp_xrif == NULL )
1908 {
1909 // This is it. If we can't write data to disk need to fix.
1910 return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "failed to open file for writing" } );
1911 }
1912
1913 size_t bw = fwrite( m_xrif_header, sizeof( uint8_t ), XRIF_HEADER_SIZE, fp_xrif );
1914
1915 if( bw != XRIF_HEADER_SIZE )
1916 {
1918 __LINE__,
1919 errno,
1920 0,
1921 "failure writing header to file. DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
1922 // We go on . . .
1923 }
1924
1925 bw = fwrite( m_xrif->raw_buffer, sizeof( uint8_t ), m_xrif->compressed_size, fp_xrif );
1926
1927 if( bw != m_xrif->compressed_size )
1928 {
1930 __LINE__,
1931 errno,
1932 0,
1933 "failure writing data to file. DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
1934 }
1935
1937
1938 if( bw != XRIF_HEADER_SIZE )
1939 {
1941 { __FILE__,
1942 __LINE__,
1943 errno,
1944 0,
1945 "failure writing timing header to file. DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
1946 }
1947
1948 bw = fwrite( m_xrif_timing->raw_buffer, sizeof( uint8_t ), m_xrif_timing->compressed_size, fp_xrif );
1949
1950 if( bw != m_xrif_timing->compressed_size )
1951 {
1953 { __FILE__,
1954 __LINE__,
1955 errno,
1956 0,
1957 "failure writing timing data to file. DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
1958 }
1959
1960 fclose( fp_xrif );
1961
1962 recordSavingStats( true );
1963
1964 if( m_writing == STOP_WRITING )
1965 {
1968 }
1969
1970 recordSavingState( true );
1971
1972 return 0;
1973
1974} // doEncode
1975
1976INDI_NEWCALLBACK_DEFN( streamWriter, m_indiP_writing )
1977( const pcf::IndiProperty &ipRecv )
1978{
1979 INDI_VALIDATE_CALLBACK_PROPS( m_indiP_writing, ipRecv );
1980
1981 if( !ipRecv.find( "toggle" ) )
1982 {
1983 return 0;
1984 }
1985
1986 if( ipRecv["toggle"].getSwitchState() == pcf::IndiElement::Off &&
1987 ( m_writing == WRITING || m_writing == START_WRITING ) )
1988 {
1989 m_writing = STOP_WRITING;
1990 }
1991
1992 if( ipRecv["toggle"].getSwitchState() == pcf::IndiElement::On && m_writing == NOT_WRITING )
1993 {
1994 m_writing = START_WRITING;
1995 }
1996
1997 return 0;
1998}
1999
2001{
2002 // Only update this if not changing
2003 if( m_writing == NOT_WRITING || m_writing == WRITING )
2004 {
2005 if( m_xrif && m_writing == WRITING )
2006 {
2007 indi::updateSwitchIfChanged( m_indiP_writing, "toggle", pcf::IndiElement::On, m_indiDriver, INDI_OK );
2008 indi::updateIfChanged( m_indiP_xrifStats, "ratio", m_xrif->compression_ratio, m_indiDriver, INDI_BUSY );
2010 m_indiP_xrifStats, "encodeMBsec", m_xrif->encode_rate / 1048576.0, m_indiDriver, INDI_BUSY );
2012 "encodeFPS",
2013 m_xrif->encode_rate / ( m_width * m_height * m_typeSize ),
2015 INDI_BUSY );
2017 m_indiP_xrifStats, "differenceMBsec", m_xrif->difference_rate / 1048576.0, m_indiDriver, INDI_BUSY );
2019 "differenceFPS",
2020 m_xrif->difference_rate / ( m_width * m_height * m_typeSize ),
2022 INDI_BUSY );
2024 m_indiP_xrifStats, "reorderMBsec", m_xrif->reorder_rate / 1048576.0, m_indiDriver, INDI_BUSY );
2026 "reorderFPS",
2027 m_xrif->reorder_rate / ( m_width * m_height * m_typeSize ),
2029 INDI_BUSY );
2031 m_indiP_xrifStats, "compressMBsec", m_xrif->compress_rate / 1048576.0, m_indiDriver, INDI_BUSY );
2033 "compressFPS",
2034 m_xrif->compress_rate / ( m_width * m_height * m_typeSize ),
2036 INDI_BUSY );
2037 }
2038 else
2039 {
2040 indi::updateSwitchIfChanged( m_indiP_writing, "toggle", pcf::IndiElement::Off, m_indiDriver, INDI_OK );
2050 }
2051 }
2052}
2053
2058
2060{
2061 return recordSavingState( true );
2062}
2063
2065{
2066 static int16_t lastState = -1;
2067 static uint64_t currSaveStart = -1;
2068
2069 int16_t state;
2071 m_writing == STOP_WRITING ) // Changed from just writing 5/2024
2072 state = 1;
2073 else
2074 state = 0;
2075
2077 {
2079
2080 lastState = state;
2082 }
2083
2084 return 0;
2085}
2086
2088{
2089 static uint32_t last_rawSize = -1;
2090 static uint32_t last_compressedSize = -1;
2091 static float last_encodeRate = -1;
2092 static float last_differenceRate = -1;
2093 static float last_reorderRate = -1;
2094 static float last_compressRate = -1;
2095
2096 if( m_xrif->raw_size != last_rawSize || m_xrif->compressed_size != last_compressedSize ||
2097 m_xrif->encode_rate != last_encodeRate || m_xrif->difference_rate != last_differenceRate ||
2098 m_xrif->reorder_rate != last_reorderRate || m_xrif->compress_rate != last_compressRate || force )
2099 {
2100 telem<telem_saving>( { (uint32_t)m_xrif->raw_size,
2101 (uint32_t)m_xrif->compressed_size,
2102 (float)m_xrif->encode_rate,
2103 (float)m_xrif->difference_rate,
2104 (float)m_xrif->reorder_rate,
2105 (float)m_xrif->compress_rate } );
2106
2107 last_rawSize = m_xrif->raw_size;
2108 last_compressedSize = m_xrif->compressed_size;
2109 last_encodeRate = m_xrif->encode_rate;
2110 last_differenceRate = m_xrif->difference_rate;
2111 last_reorderRate = m_xrif->reorder_rate;
2112 last_compressRate = m_xrif->compress_rate;
2113 }
2114
2115 return 0;
2116}
2117
2118} // namespace app
2119} // namespace MagAOX
2120
2121#endif
The base-class for XWCTk applications.
std::string basePath()
Get the.
stateCodes::stateCodeT state()
Get the current state code.
int registerIndiPropertyNew(pcf::IndiProperty &prop, int(*)(void *, const pcf::IndiProperty &))
Register an INDI property which is exposed for others to request a New Property for.
int createStandardIndiToggleSw(pcf::IndiProperty &prop, const std::string &name, const std::string &label="", const std::string &group="")
Create a standard R/W INDI switch with a single toggle element.
int m_shutdown
Flag to signal it's time to shutdown. When not 0, the main loop exits.
int shutdown()
Get the value of the shutdown flag.
indiDriver< MagAOXApp > * m_indiDriver
The INDI driver wrapper. Constructed and initialized by execute, which starts and stops communication...
static int log(const typename logT::messageT &msg, logPrioT level=logPrio::LOG_DEFAULT)
Make a log entry.
int threadStart(std::thread &thrd, bool &thrdInit, pid_t &tpid, pcf::IndiProperty &thProp, int thrdPrio, const std::string &cpuset, const std::string &thrdName, thisPtr *thrdThis, Function &&thrdStart)
Start a thread, using this class's privileges to set priority, etc.
pcf::IndiProperty m_indiP_xrifStats
pcf::IndiProperty m_indiP_writing
double m_currImageTime
The write-time of the current image.
std::thread m_fgThread
A separate thread for the actual framegrabbings.
int m_fgThreadPrio
Priority of the framegrabber thread, should normally be > 00.
pid_t m_fgThreadID
F.g. thread PID.
size_t m_circBuffLength
The length of the circular buffer, in frames.
int initialize_xrif()
Initialize the xrif system.
int recordTelem(const telem_saving_state *)
uint8_t m_dataType
The ImageStreamIO type code.
bool m_fgThreadInit
Synchronizer to ensure f.g. thread initializes before doing dangerous things.
int m_typeSize
The pixel byte depth.
xrif_t m_xrif_timing
The xrif compression handle for image data.
uint64_t m_currSaveStart
The circular buffer position at which to start saving.
virtual int appStartup()
Startup functions.
size_t m_writeChunkLength
The number of frames to write at a time.
size_t m_height
The height of the image.
uint64_t m_currSaveStopFrameNo
The frame number of the image at which saving stopped (for logging)
double m_currChunkStartTime
The write-time of the first image in the chunk.
int allocate_circbufs()
Worker function to allocate the circular buffers.
static void getCircBuffLengths(size_t &circBuffLength, double &circBuffSize, size_t &writeChunkLength, size_t maxCircBuffLength, double maxCircBuffSize, size_t maxWriteChunkLength, uint32_t width, uint32_t height, size_t typeSize)
void handlerSigSegv(int signum, siginfo_t *siginf, void *ucont)
Handles SIGSEGV and SIGBUS. Sets m_restart to true.
int m_swThreadPrio
Priority of the stream writer thread, should normally be > 0, and <= m_fgThreadPrio.
int recordSavingStats(bool force=false)
pcf::IndiProperty m_swThreadProp
The property to hold the s.w. thread details.
std::thread m_swThread
A separate thread for the actual writing.
std::string m_fgCpuset
The cpuset for the framegrabber thread. Ignored if empty (the default).
virtual int appLogic()
Implementation of the FSM for the Siglent SDG.
static void fgThreadStart(streamWriter *s)
Thread starter, called by fgThreadStart on thread construction. Calls fgThreadExec.
std::string m_outName
The name to use for outputting files, Default is m_shmimName.
size_t m_maxCircBuffLength
The maximum length of the circular buffer, in frames.
INDI_NEWCALLBACK_DECL(streamWriter, m_indiP_writing)
char * m_xrif_timing_header
Storage for the xrif image data file header.
~streamWriter() noexcept
Destructor.
int allocate_xrif()
Worker function to configure and allocate the xrif handles.
int setSigSegvHandler()
Sets the handler for SIGSEGV and SIGBUS.
std::string m_swCpuset
The cpuset for the framegrabber thread. Ignored if empty (the default).
xrif_t m_xrif
The xrif compression handle for image data.
virtual void setupConfig()
Setup the configuration system (called by MagAOXApp::setup())
int m_semaphoreNumber
The image structure semaphore index.
std::string m_rawimageDir
The path where files will be saved.
static void _handlerSigSegv(int signum, siginfo_t *siginf, void *ucont)
int recordSavingState(bool force=false)
uint64_t m_currChunkStart
The circular buffer starting position of the current to-be-written chunk.
pid_t m_swThreadID
S.w. thread pid.
virtual void loadConfig()
load the configuration system results (called by MagAOXApp::setup())
pcf::IndiProperty m_fgThreadProp
The property to hold the f.g. thread details.
void swThreadExec()
Execute the stream writer main loop.
uint64_t m_nextChunkStart
The circular buffer starting position of the next to-be-written chunk.
static streamWriter * m_selfWriter
char * m_xrif_header
Storage for the xrif image data file header.
double m_maxChunkTime
The maximum time before writing regardless of number of frames.
double m_maxCircBuffSize
The maximum size of the circular bufffer in MB.
int doEncode()
Function called when semaphore is raised to do the encode and write.
static void swThreadStart(streamWriter *s)
Thread starter, called by swThreadStart on thread construction. Calls swThreadExec.
uint64_t m_currSaveStop
The circular buffer position at which to stop saving.
void fgThreadExec()
Execute the frame grabber main loop.
std::string m_outFilePath
The full path for the latest output file.
std::string m_shmimName
The name of the shared memory buffer.
dev::telemeter< streamWriter > telemeterT
size_t m_width
The width of the image.
double m_circBuffSize
The size of the circular buffer, in MB.
virtual int appShutdown()
Do any needed shutdown tasks. Currently nothing in this app.
sem_t m_swSemaphore
Semaphore used to synchronize the fg thread and the sw thread.
bool m_swThreadInit
Synchronizer to ensure s.w. thread initializes before doing dangerous things.
#define MAGAOX_rawimageRelPath
The relative path to the raw images directory.
Definition paths.hpp:92
#define MAGAOX_env_rawimage
Environment variable setting the relative raw image path.
#define INDI_NEWCALLBACK_DEFN(class, prop)
Define the callback for a new property request.
#define REG_INDI_NEWPROP_NOCB(prop, propName, type)
Register a NEW INDI property with the class, with no callback.
#define INDI_NEWCALLBACK(prop)
Get the name of the static callback wrapper for a new property.
@ OPERATING
The device is operating, other than homing.
@ READY
The device is ready for operation, but is not operating.
#define INDI_VALIDATE_CALLBACK_PROPS(prop1, prop2)
Standard check for matching INDI properties in a callback.
#define INDI_IDLE
Definition indiUtils.hpp:27
#define INDI_BUSY
Definition indiUtils.hpp:29
#define INDI_OK
Definition indiUtils.hpp:28
void updateIfChanged(pcf::IndiProperty &p, const std::string &el, const T &newVal, indiDriverT *indiDriver, pcf::IndiProperty::PropertyStateType newState=pcf::IndiProperty::Ok)
Update the value of the INDI element, but only if it has changed.
Definition indiUtils.hpp:92
void updateSwitchIfChanged(pcf::IndiProperty &p, const std::string &el, const pcf::IndiElement::SwitchStateType &newVal, indiDriverT *indiDriver, pcf::IndiProperty::PropertyStateType newState=pcf::IndiProperty::Ok)
Update the value of the INDI element, but only if it has changed.
std::stringstream msg
const pcf::IndiProperty & ipRecv
mx::error_t fileTimeRelPath(std::string &tstamp, std::string &relPath, time_t ts_sec, long ts_nsec)
Get the timestamp and the relative path based on a time.
Definition dm.hpp:28
static constexpr logPrioT LOG_NOTICE
A normal but significant condition.
static constexpr logPrioT LOG_CRITICAL
The process can not continue and will shut down (fatal)
static constexpr logPrioT LOG_WARNING
A condition has occurred which may become an error, but the process continues.
static constexpr logPrioT LOG_DEBUG
Used for debugging.
#define XWC_SEM_WAIT_TS_RETVOID(ts, sec, nsec)
Add the wait time to a timespec for a sem_timedwait call, with no value returned on error.
Definition semUtils.hpp:20
#define NOT_WRITING
#define STOP_WRITING
#define WRITING
#define START_WRITING
A device base class which saves telemetry.
Definition telemeter.hpp:69
int appShutdown()
Perform telemeter application shutdown.
int loadConfig(appConfigurator &config)
Load the device section from an application configurator.
int appLogic()
Perform telemeter application logic.
int setupConfig(appConfigurator &config)
Setup an application configurator for the device section.
int appStartup()
Starts the telemetry log thread.
int checkRecordTimes(const telT &tel, telTs... tels)
Check the time of the last record for each telemetry type and make an entry if needed.
Software CRITICAL log entry.
Software ERR log entry.