API
 
Loading...
Searching...
No Matches
streamWriter.hpp
Go to the documentation of this file.
1/** \file streamWriter.hpp
2 * \brief The MagAO-X Image Stream Writer
3 *
4 * \author Jared R. Males (jaredmales@gmail.com)
5 *
6 * \ingroup streamWriter_files
7 */
8
9#ifndef streamWriter_hpp
10#define streamWriter_hpp
11
12#include <ImageStreamIO/ImageStruct.h>
13#include <ImageStreamIO/ImageStreamIO.h>
14
15#include <xrif/xrif.h>
16
17#include <mx/sys/timeUtils.hpp>
18
19#include "../../libMagAOX/libMagAOX.hpp" //Note this is included on command line to trigger pch
20#include "../../magaox_git_version.h"
21
22#define NOT_WRITING ( 0 )
23#define START_WRITING ( 1 )
24#define WRITING ( 2 )
25#define STOP_WRITING ( 3 )
26
27//#define SW_DEBUG
28
29namespace MagAOX
30{
31namespace app
32{
33
34/** \defgroup streamWriter ImageStreamIO Stream Writing
35 * \brief Writes the contents of an ImageStreamIO image stream to disk.
36 *
37 * <a href="../handbook/operating/software/apps/streamWriter.html">Application Documentation</a>
38 *
39 * \ingroup apps
40 *
41 */
42
43/** \defgroup streamWriter_files ImageStreamIO Stream Writing
44 * \ingroup streamWriter
45 */
46
47/** MagAO-X application to control writing ImageStreamIO streams to disk.
48 *
49 * \ingroup streamWriter
50 *
51 */
52class streamWriter : public MagAOXApp<>, public dev::telemeter<streamWriter>
53{
55
56 friend class dev::telemeter<streamWriter>;
57
58 // Give the test harness access.
59 friend class streamWriter_test;
60
61 protected:
62 /** \name configurable parameters
63 *@{
64 */
65
66 std::string m_rawimageDir; ///< The path where files will be saved.
67
68 size_t m_maxCircBuffLength{ 1024 }; ///< The maximum length of the circular buffer, in frames
69
70 double m_maxCircBuffSize{ 2048 }; ///< The maximum size of the circular bufffer in MB.
71
72 size_t m_maxWriteChunkLength{ 512 }; /**< The maximum number of frames to write at a time. Must
73 be an integer factor of m_maxCircBuffLength.*/
74
75 double m_maxChunkTime{ 10 }; ///< The maximum time before writing regardless of number of frames.
76
77 std::string m_shmimName; ///< The name of the shared memory buffer.
78
79 std::string m_outName; ///< The name to use for outputting files, Default is m_shmimName.
80
81 int m_semaphoreNumber{ 7 }; ///< The image structure semaphore index.
82
83 unsigned m_semWaitSec{
84 0 }; ///< The time in whole sec to wait on the semaphore, to which m_semWaitNSec is added. Default is 0 nsec.
85
86 unsigned m_semWaitNSec{ 500000000 }; ///< The time in nsec to wait on the semaphore, added to m_semWaitSec. Max is
87 ///< 999999999. Default is 5e8 nsec.
88
89 int m_lz4accel{ 1 };
90
91 bool m_compress{ true };
92
93 ///@}
94
95 size_t m_circBuffLength{ 1024 }; ///< The length of the circular buffer, in frames
96 double m_circBuffSize{ 2048.0 }; ///< The size of the circular buffer, in MB
97 size_t m_writeChunkLength{ 512 }; ///< The number of frames to write at a time
98
99 size_t m_width{ 0 }; ///< The width of the image
100 size_t m_height{ 0 }; ///< The height of the image
101 uint8_t m_dataType{ 0 }; ///< The ImageStreamIO type code.
102 int m_typeSize{ 0 }; ///< The pixel byte depth
103
104 char *m_rawImageCircBuff{ nullptr };
106
107 size_t m_currImage{ 0 };
108
109 double m_currImageTime{ 0 }; ///< The write-time of the current image
110
111 double m_currChunkStartTime{ 0 }; ///< The write-time of the first image in the chunk
112
113 // Writer book-keeping:
115 NOT_WRITING }; ///< Controls whether or not images are being written, and sequences start and stop of writing.
116
117 uint64_t m_currChunkStart{ 0 }; ///< The circular buffer starting position of the current to-be-written chunk.
118 uint64_t m_nextChunkStart{ 0 }; ///< The circular buffer starting position of the next to-be-written chunk.
119
120 uint64_t m_currSaveStart{ 0 }; ///< The circular buffer position at which to start saving.
121 uint64_t m_currSaveStop{ 0 }; ///< The circular buffer position at which to stop saving.
122
123 uint64_t m_currSaveStopFrameNo{ 0 }; ///< The frame number of the image at which saving stopped (for logging)
124
125 /// The xrif compression handle for image data
126 xrif_t m_xrif{ nullptr };
127
128 /// Storage for the xrif image data file header
129 char *m_xrif_header{ nullptr };
130
131 /// The xrif compression handle for image data
133
134 /// Storage for the xrif image data file header
135 char *m_xrif_timing_header{ nullptr };
136
137 public:
138 /// Default c'tor
139 streamWriter();
140
141 /// Destructor
143
144 /// Setup the configuration system (called by MagAOXApp::setup())
145 virtual void setupConfig();
146
147 /// load the configuration system results (called by MagAOXApp::setup())
148 virtual void loadConfig();
149
150 /// Startup functions
151 /** Sets up the INDI vars.
152 *
153 */
154 virtual int appStartup();
155
156 /// Implementation of the FSM for the Siglent SDG
157 virtual int appLogic();
158
159 /// Do any needed shutdown tasks. Currently nothing in this app.
160 virtual int appShutdown();
161
162 protected:
163 /** \name SIGSEGV & SIGBUS signal handling
164 * These signals occur as a result of a ImageStreamIO source server resetting (e.g. changing frame sizes).
165 * When they occur a restart of the framegrabber and framewriter thread main loops is triggered.
166 *
167 * @{
168 */
169 bool m_restart{ false };
170
171 static streamWriter *m_selfWriter; ///< Static pointer to this (set in constructor). Used for getting out of the
172 ///< static SIGSEGV handler.
173
174 /// Initialize the xrif system.
175 /** Allocates the handles and headers pointers.
176 *
177 * \returns 0 on success.
178 * \returns -1 on error.
179 */
180 int initialize_xrif();
181
182 /// Sets the handler for SIGSEGV and SIGBUS
183 /** These are caused by ImageStreamIO server resets.
184 */
185 int setSigSegvHandler();
186
187 /// The handler called when SIGSEGV or SIGBUS is received, which will be due to ImageStreamIO server resets. Just a
188 /// wrapper for handlerSigSegv.
189 static void _handlerSigSegv( int signum, siginfo_t *siginf, void *ucont );
190
191 /// Handles SIGSEGV and SIGBUS. Sets m_restart to true.
192 void handlerSigSegv( int signum, siginfo_t *siginf, void *ucont );
193 ///@}
194
195 /** \name Framegrabber Thread
196 * This thread monitors the ImageStreamIO buffer and copies its images to the circular buffer.
197 *
198 * @{
199 */
200 int m_fgThreadPrio{ 1 }; ///< Priority of the framegrabber thread, should normally be > 00.
201
202 std::string m_fgCpuset; ///< The cpuset for the framegrabber thread. Ignored if empty (the default).
203
204 std::thread m_fgThread; ///< A separate thread for the actual framegrabbings
205
206 bool m_fgThreadInit{ true }; ///< Synchronizer to ensure f.g. thread initializes before doing dangerous things.
207
208 pid_t m_fgThreadID{ 0 }; ///< F.g. thread PID.
209
210 pcf::IndiProperty m_fgThreadProp; ///< The property to hold the f.g. thread details.
211
212 public:
213 static void getCircBuffLengths( size_t &circBuffLength,
214 double &circBuffSize,
215 size_t &writeChunkLength,
216 size_t maxCircBuffLength,
217 double maxCircBuffSize,
218 size_t maxWriteChunkLength,
219 uint32_t width,
220 uint32_t height,
221 size_t typeSize );
222
223 protected:
224 /// Worker function to allocate the circular buffers.
225 /** This takes place in the fg thread after connecting to the stream.
226 *
227 * \returns 0 on sucess.
228 * \returns -1 on error.
229 */
230 int allocate_circbufs();
231
232 /// Worker function to configure and allocate the xrif handles.
233 /** This takes place in the fg thread after connecting to the stream.
234 *
235 * \returns 0 on sucess.
236 * \returns -1 on error.
237 */
238 int allocate_xrif();
239
240 /// Thread starter, called by fgThreadStart on thread construction. Calls fgThreadExec.
241 static void fgThreadStart( streamWriter *s /**< [in] a pointer to an streamWriter instance (normally this) */ );
242
243 /// Execute the frame grabber main loop.
244 void fgThreadExec();
245
246 ///@}
247
248 /** \name Stream Writer Thread
249 * This thread writes chunks of the circular buffer to disk.
250 *
251 * @{
252 */
253 int m_swThreadPrio{ 1 }; ///< Priority of the stream writer thread, should normally be > 0, and <= m_fgThreadPrio.
254
255 std::string m_swCpuset; ///< The cpuset for the framegrabber thread. Ignored if empty (the default).
256
257 sem_t m_swSemaphore; ///< Semaphore used to synchronize the fg thread and the sw thread.
258
259 std::thread m_swThread; ///< A separate thread for the actual writing
260
261 bool m_swThreadInit{ true }; ///< Synchronizer to ensure s.w. thread initializes before doing dangerous things.
262
263 pid_t m_swThreadID{ 0 }; ///< S.w. thread pid.
264
265 pcf::IndiProperty m_swThreadProp; ///< The property to hold the s.w. thread details.
266
267 size_t m_fnameSz{ 0 };
268
269 char *m_fname{ nullptr };
270
271 std::string m_fnameBase;
272
273 /// Thread starter, called by swThreadStart on thread construction. Calls swThreadExec.
274 static void swThreadStart( streamWriter *s /**< [in] a pointer to an streamWriter instance (normally this) */ );
275
276 /// Execute the stream writer main loop.
277 void swThreadExec();
278
279 /// Function called when semaphore is raised to do the encode and write.
280 int doEncode();
281 ///@}
282
283 // INDI:
284 protected:
285 // declare our properties
286 pcf::IndiProperty m_indiP_writing;
287
288 pcf::IndiProperty m_indiP_xrifStats;
289
290 public:
292
293 void updateINDI();
294
295 /** \name Telemeter Interface
296 *
297 * @{
298 */
299 int checkRecordTimes();
300
301 int recordTelem( const telem_saving_state * );
302
303 int recordSavingState( bool force = false );
304 int recordSavingStats( bool force = false );
305
306 ///@}
307};
308
309// Set self pointer to null so app starts up uninitialized.
311
312streamWriter::streamWriter() : MagAOXApp( MAGAOX_CURRENT_SHA1, MAGAOX_REPO_MODIFIED )
313{
314 m_powerMgtEnabled = false;
315
316 m_selfWriter = this;
317
318 return;
319}
320
322{
323 if( m_xrif )
325
326 if( m_xrif_header )
328
329 if( m_xrif_timing )
331
334
335 return;
336}
337
339{
340 config.add( "writer.savePath",
341 "",
342 "writer.savePath",
343 argType::Required,
344 "writer",
345 "savePath",
346 false,
347 "string",
348 "The absolute path where images are saved. Will use MagAO-X default if not set." );
349
350 config.add( "writer.maxCircBuffLength",
351 "",
352 "writer.maxCircBuffLength",
353 argType::Required,
354 "writer",
355 "maxCircBuffLength",
356 false,
357 "size_t",
358 "The maximum length in frames of the circular buffer. Should be an integer multiple of and larger than "
359 "maxWriteChunkLength." );
360
361 config.add( "writer.maxCircBuffSize",
362 "",
363 "writer.maxCircBuffSize",
364 argType::Required,
365 "writer",
366 "maxCircBuffSize",
367 false,
368 "double",
369 "The maximum size in MB of the circular buffer. Should be sized to hold at least 2 of the maximum "
370 "frame size." );
371
372 config.add(
373 "writer.maxWriteChunkLength",
374 "",
375 "writer.maxWriteChunkLength",
376 argType::Required,
377 "writer",
378 "maxWriteChunkLength",
379 false,
380 "size_t",
381 "The maximum length in frames of the chunks to write to disk. Should be smaller than maxCircBuffLength." );
382
383 config.add( "writer.maxChunkTime",
384 "",
385 "writer.maxChunkTime",
386 argType::Required,
387 "writer",
388 "maxChunkTime",
389 false,
390 "float",
391 "The max length in seconds of the chunks to write to disk. Default is 60 sec." );
392
393 config.add( "writer.threadPrio",
394 "",
395 "writer.threadPrio",
396 argType::Required,
397 "writer",
398 "threadPrio",
399 false,
400 "int",
401 "The real-time priority of the stream writer thread." );
402
403 config.add( "writer.cpuset",
404 "",
405 "writer.cpuset",
406 argType::Required,
407 "writer",
408 "cpuset",
409 false,
410 "int",
411 "The cpuset for the writer thread." );
412
413 config.add( "writer.compress",
414 "",
415 "writer.compress",
416 argType::Required,
417 "writer",
418 "compress",
419 false,
420 "bool",
421 "Flag to set whether compression is used. Default true." );
422
423 config.add( "writer.lz4accel",
424 "",
425 "writer.lz4accel",
426 argType::Required,
427 "writer",
428 "lz4accel",
429 false,
430 "int",
431 "The LZ4 acceleration parameter. Larger is faster, but lower compression." );
432
433 config.add( "writer.outName",
434 "",
435 "writer.outName",
436 argType::Required,
437 "writer",
438 "outName",
439 false,
440 "int",
441 "The name to use for output files. Default is the shmimName." );
442
443 config.add( "framegrabber.shmimName",
444 "",
445 "framegrabber.shmimName",
446 argType::Required,
447 "framegrabber",
448 "shmimName",
449 false,
450 "int",
451 "The name of the stream to monitor. From /tmp/shmimName.im.shm." );
452
453 config.add( "framegrabber.semaphoreNumber",
454 "",
455 "framegrabber.semaphoreNumber",
456 argType::Required,
457 "framegrabber",
458 "semaphoreNumber",
459 false,
460 "int",
461 "The semaphore to wait on. Default is 7." );
462
463 config.add( "framegrabber.semWait",
464 "",
465 "framegrabber.semWait",
466 argType::Required,
467 "framegrabber",
468 "semWait",
469 false,
470 "int",
471 "The time in nsec to wait on the semaphore. Max is 999999999. Default is 5e8 nsec." );
472
473 config.add( "framegrabber.threadPrio",
474 "",
475 "framegrabber.threadPrio",
476 argType::Required,
477 "framegrabber",
478 "threadPrio",
479 false,
480 "int",
481 "The real-time priority of the framegrabber thread." );
482
483 config.add( "framegrabber.cpuset",
484 "",
485 "framegrabber.cpuset",
486 argType::Required,
487 "framegrabber",
488 "cpuset",
489 false,
490 "string",
491 "The cpuset for the framegrabber thread." );
492
493 telemeterT::setupConfig( config );
494}
495
497{
498
499 config( m_maxCircBuffLength, "writer.maxCircBuffLength" );
500 config( m_maxCircBuffSize, "writer.maxCircBuffSize" );
501 config( m_maxWriteChunkLength, "writer.maxWriteChunkLength" );
502 config( m_maxChunkTime, "writer.maxChunkTime" );
503 config( m_swThreadPrio, "writer.threadPrio" );
504 config( m_swCpuset, "writer.cpuset" );
505 config( m_compress, "writer.compress" );
506 config( m_lz4accel, "writer.lz4accel" );
508 {
510 }
512 {
514 }
515
516 config( m_shmimName, "framegrabber.shmimName" );
517
519 config( m_outName, "writer.outName" );
520
521 config( m_semaphoreNumber, "framegrabber.semaphoreNumber" );
522 config( m_semWaitNSec, "framegrabber.semWait" );
523
524 config( m_fgThreadPrio, "framegrabber.threadPrio" );
525 config( m_fgCpuset, "framegrabber.cpuset" );
526
527 // Set some defaults
528 // Setup default log path
530 config( m_rawimageDir, "writer.savePath" );
531
532 if( telemeterT::loadConfig( config ) < 0 )
533 {
534 log<text_log>( "Error during telemeter config", logPrio::LOG_CRITICAL );
535 m_shutdown = true;
536 }
537}
538
540{
541 // Create save directory.
542 errno = 0;
543 if( mkdir( m_rawimageDir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH ) < 0 )
544 {
545 if( errno != EEXIST )
546 {
547 std::stringstream logss;
548 logss << "Failed to create image directory (" << m_rawimageDir << "). Errno says: " << strerror( errno );
550
551 return -1;
552 }
553 }
554
555 // set up the INDI properties
558
559 // Register the stats INDI property
560 REG_INDI_NEWPROP_NOCB( m_indiP_xrifStats, "xrif", pcf::IndiProperty::Number );
561 m_indiP_xrifStats.setLabel( "xrif compression performance" );
562
563 indi::addNumberElement<float>( m_indiP_xrifStats, "ratio", 0, 1.0, 0.0, "%0.2f", "Compression Ratio" );
564
565 indi::addNumberElement<float>( m_indiP_xrifStats,
566 "differenceMBsec",
567 0,
568 std::numeric_limits<float>::max(),
569 0.0,
570 "%0.2f",
571 "Differencing Rate [MB/sec]" );
572
573 indi::addNumberElement<float>( m_indiP_xrifStats,
574 "reorderMBsec",
575 0,
576 std::numeric_limits<float>::max(),
577 0.0,
578 "%0.2f",
579 "Reordering Rate [MB/sec]" );
580
581 indi::addNumberElement<float>( m_indiP_xrifStats,
582 "compressMBsec",
583 0,
584 std::numeric_limits<float>::max(),
585 0.0,
586 "%0.2f",
587 "Compression Rate [MB/sec]" );
588
589 indi::addNumberElement<float>( m_indiP_xrifStats,
590 "encodeMBsec",
591 0,
592 std::numeric_limits<float>::max(),
593 0.0,
594 "%0.2f",
595 "Total Encoding Rate [MB/sec]" );
596
597 indi::addNumberElement<float>( m_indiP_xrifStats,
598 "differenceFPS",
599 0,
600 std::numeric_limits<float>::max(),
601 0.0,
602 "%0.2f",
603 "Differencing Rate [f.p.s.]" );
604
605 indi::addNumberElement<float>( m_indiP_xrifStats,
606 "reorderFPS",
607 0,
608 std::numeric_limits<float>::max(),
609 0.0,
610 "%0.2f",
611 "Reordering Rate [f.p.s.]" );
612
613 indi::addNumberElement<float>( m_indiP_xrifStats,
614 "compressFPS",
615 0,
616 std::numeric_limits<float>::max(),
617 0.0,
618 "%0.2f",
619 "Compression Rate [f.p.s.]" );
620
621 indi::addNumberElement<float>( m_indiP_xrifStats,
622 "encodeFPS",
623 0,
624 std::numeric_limits<float>::max(),
625 0.0,
626 "%0.2f",
627 "Total Encoding Rate [f.p.s.]" );
628
629 // Now set up the framegrabber and writer threads.
630 // - need SIGSEGV and SIGBUS handling for ImageStreamIO restarts
631 // - initialize the semaphore
632 // - start the threads
633
634 if( setSigSegvHandler() < 0 )
635 return log<software_error, -1>( { __FILE__, __LINE__ } );
636
637 if( sem_init( &m_swSemaphore, 0, 0 ) < 0 )
638 return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "Initializing S.W. semaphore" } );
639
640 // Check if we have a safe writeChunkLengthh
642 {
643 return log<software_critical, -1>(
644 { __FILE__, __LINE__, "Write chunk length is not a divisor of circular buffer length." } );
645 }
646
647 if( initialize_xrif() < 0 )
649
656 "framegrabber",
657 this,
658 fgThreadStart ) < 0 )
659 {
660 return log<software_critical, -1>( { __FILE__, __LINE__ } );
661 }
662
669 "streamwriter",
670 this,
671 swThreadStart ) < 0 )
672 {
674 }
675
676 if( telemeterT::appStartup() < 0 )
677 {
678 return log<software_error, -1>( { __FILE__, __LINE__ } );
679 }
680
681 return 0;
682}
683
685{
686
687 // first do a join check to see if other threads have exited.
688 // these will throw if the threads are really gone
689 try
690 {
691 if( pthread_tryjoin_np( m_fgThread.native_handle(), 0 ) == 0 )
692 {
693 log<software_error>( { __FILE__, __LINE__, "framegrabber thread has exited" } );
694 return -1;
695 }
696 }
697 catch( ... )
698 {
699 log<software_error>( { __FILE__, __LINE__, "framegrabber thread has exited" } );
700 return -1;
701 }
702
703 try
704 {
705 if( pthread_tryjoin_np( m_swThread.native_handle(), 0 ) == 0 )
706 {
707 log<software_error>( { __FILE__, __LINE__, "stream thread has exited" } );
708 return -1;
709 }
710 }
711 catch( ... )
712 {
713 log<software_error>( { __FILE__, __LINE__, "streamwriter thread has exited" } );
714 return -1;
715 }
716
717 switch( m_writing )
718 {
719 case NOT_WRITING:
721 break;
722 default:
724 }
725
727 {
728 if( telemeterT::appLogic() < 0 )
729 {
731 return 0;
732 }
733 }
734
735 updateINDI();
736
737 return 0;
738}
739
741{
743 updateINDI();
744
745 try
746 {
747 if( m_fgThread.joinable() )
748 {
749 m_fgThread.join();
750 }
751 }
752 catch( ... )
753 {
754 }
755
756 try
757 {
758 if( m_swThread.joinable() )
759 {
760 m_swThread.join();
761 }
762 }
763 catch( ... )
764 {
765 }
766
767 if( m_xrif )
768 {
770 m_xrif = nullptr;
771 }
772
773 if( m_xrif_timing )
774 {
776 m_xrif_timing = nullptr;
777 }
778
780
781 return 0;
782}
783
785{
787 if( rv != XRIF_NOERROR )
788 {
789 return log<software_critical, -1>(
790 { __FILE__, __LINE__, 0, rv, "xrif handle allocation or initialization error." } );
791 }
792
793 if( m_compress )
794 {
796 if( rv != XRIF_NOERROR )
797 {
798 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
799 }
800 }
801 else
802 {
803 std::cerr << "not compressing . . . \n";
805 if( rv != XRIF_NOERROR )
806 {
807 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
808 }
809 }
810
811 errno = 0;
812 m_xrif_header = reinterpret_cast< char *>(malloc( XRIF_HEADER_SIZE * sizeof( char ) ));
813 if( m_xrif_header == NULL )
814 {
815 return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "xrif header allocation failed." } );
816 }
817
819 if( rv != XRIF_NOERROR )
820 {
821 return log<software_critical, -1>(
822 { __FILE__, __LINE__, 0, rv, "xrif handle allocation or initialization error." } );
823 }
824
825 // m_xrif_timing->reorder_method = XRIF_REORDER_NONE;
827 if( rv != XRIF_NOERROR )
828 {
829 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
830 }
831
832 errno = 0;
833 m_xrif_timing_header = reinterpret_cast< char *>(malloc( XRIF_HEADER_SIZE * sizeof( char ) ));
835 {
836 return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "xrif header allocation failed." } );
837 }
838
839 return 0;
840}
841
843{
844 struct sigaction act;
845 sigset_t set;
846
847 act.sa_sigaction = &streamWriter::_handlerSigSegv;
848 act.sa_flags = SA_SIGINFO;
849 sigemptyset( &set );
850 act.sa_mask = set;
851
852 errno = 0;
853 if( sigaction( SIGSEGV, &act, 0 ) < 0 )
854 {
855 std::string logss = "Setting handler for SIGSEGV failed. Errno says: ";
856 logss += strerror( errno );
857
859
860 return -1;
861 }
862
863 errno = 0;
864 if( sigaction( SIGBUS, &act, 0 ) < 0 )
865 {
866 std::string logss = "Setting handler for SIGBUS failed. Errno says: ";
867 logss += strerror( errno );
868
870
871 return -1;
872 }
873
874 log<text_log>( "Installed SIGSEGV/SIGBUS signal handler.", logPrio::LOG_DEBUG );
875
876 return 0;
877}
878
879void streamWriter::_handlerSigSegv( int signum, siginfo_t *siginf, void *ucont )
880{
882}
883
884void streamWriter::handlerSigSegv( int signum, siginfo_t *siginf, void *ucont )
885{
886 static_cast<void>( signum );
887 static_cast<void>( siginf );
888 static_cast<void>( ucont );
889
890 m_restart = true;
891
892 return;
893}
894
895void streamWriter::getCircBuffLengths( size_t &circBuffLength,
896 double &circBuffSize,
897 size_t &writeChunkLength,
898 size_t maxCircBuffLength,
899 double maxCircBuffSize,
900 size_t maxWriteChunkLength,
901 uint32_t width,
902 uint32_t height,
903 size_t typeSize )
904{
905 static constexpr double MB = 1048576.0;
906
907 size_t isz = width * height * typeSize * maxCircBuffLength;
908
909 if( isz <= maxCircBuffSize * MB )
910 {
912 circBuffSize = isz / MB;
914
915 return;
916 }
917
918 circBuffLength = maxCircBuffSize * MB / ( width * height * typeSize );
919
920 if(circBuffLength % 2 == 1)
921 {
923 }
924
925 circBuffSize = ( width * height * typeSize * circBuffLength ) / MB;
926
928
929 if( circBuffLength == 0 )
930 {
931 return;
932 }
933
934 if( writeChunkLength == 0 )
935 {
937 }
938
939 while( circBuffLength % writeChunkLength != 0 )
940 {
942 }
943
944 return;
945}
946
948{
949
956 m_width,
957 m_height,
958 m_typeSize );
959
960 if( m_circBuffLength < 2 )
961 {
962 return log<software_critical, -1>( { __FILE__, __LINE__, "frame size too large to fit in maxCircBuffSize" } );
963 }
964
966 {
967 return log<software_critical, -1>(
968 { __FILE__, __LINE__, "writeChunkLength is not smaller than circBuffLength" } );
969 }
970
972 {
973 return log<software_critical, -1>(
974 { __FILE__, __LINE__, "writeChunkLength is not an integer factor of circBuffLength" } );
975 }
976
977 std::string msg = "Set circ buff length: " + std::to_string( m_circBuffLength ) + " frames (";
978 msg += std::to_string( m_circBuffSize ) + " MB). Write chunk length: " + std::to_string( m_writeChunkLength );
979 msg += " frames.";
980
982
984 {
986 }
987
988 errno = 0;
989 m_rawImageCircBuff = reinterpret_cast< char *>(malloc( m_width * m_height * m_typeSize * m_circBuffLength ));
990
991 if( m_rawImageCircBuff == NULL )
992 {
993 return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "buffer allocation failure" } );
994 }
995
996 if( m_timingCircBuff )
997 {
999 }
1000
1001 errno = 0;
1002 m_timingCircBuff = reinterpret_cast<uint64_t *>(malloc( 5 * sizeof( uint64_t ) * m_circBuffLength ));
1003 if( m_timingCircBuff == NULL )
1004 {
1005 return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "buffer allocation failure" } );
1006 }
1007
1008 return 0;
1009}
1010
1012{
1013 // Set up the image data xrif handle
1015
1016 if( m_compress )
1017 {
1019 if( rv != XRIF_NOERROR )
1020 {
1021 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
1022 }
1023 }
1024 else
1025 {
1026 std::cerr << "not compressing . . . \n";
1028 if( rv != XRIF_NOERROR )
1029 {
1030 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
1031 }
1032 }
1033
1035 if( rv != XRIF_NOERROR )
1036 {
1037 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_set_size error." } );
1038 }
1039
1041 if( rv != XRIF_NOERROR )
1042 {
1043 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_raw error." } );
1044 }
1045
1047 if( rv != XRIF_NOERROR )
1048 {
1049 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_reordered error." } );
1050 }
1051
1052 // Set up the timing data xrif handle
1054 if( rv != XRIF_NOERROR )
1055 {
1056 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
1057 }
1058
1060 if( rv != XRIF_NOERROR )
1061 {
1062 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_set_size error." } );
1063 }
1064
1066 if( rv != XRIF_NOERROR )
1067 {
1068 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_raw error." } );
1069 }
1070
1072 if( rv != XRIF_NOERROR )
1073 {
1074 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_reordered error." } );
1075 }
1076
1077 return 0;
1078}
1079
1081{
1082 o->fgThreadExec();
1083}
1084
1086{
1088
1089 // Wait fpr the thread starter to finish initializing this thread.
1090 while( m_fgThreadInit == true && m_shutdown == 0 )
1091 {
1092 sleep( 1 );
1093 }
1094
1095 timespec missing_ts;
1096
1097 IMAGE image;
1098 ino_t inode = 0; // The inode of the image stream file
1099
1100 bool opened = false;
1101
1102 while( m_shutdown == 0 )
1103 {
1104 /* Initialize ImageStreamIO
1105 */
1106 opened = false;
1107 m_restart = false; // Set this up front, since we're about to restart.
1108
1109 sem_t *sem{ nullptr }; ///< The semaphore to monitor for new image data
1110
1111 int logged = 0;
1112 while( !opened && !m_shutdown && !m_restart )
1113 {
1114 // b/c ImageStreamIO prints every single time, and latest version don't support stopping it yet,
1115 // and that isn't thread-safe-able anyway
1116 // we do our own checks. This is the same code in ImageStreamIO_openIm...
1117 int SM_fd;
1118 char SM_fname[200];
1119 ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), m_shmimName.c_str() );
1120 SM_fd = open( SM_fname, O_RDWR );
1121 if( SM_fd == -1 )
1122 {
1123 if( !logged )
1124 {
1125 log<text_log>( "ImageStream " + m_shmimName + " not found (yet). Retrying . . .",
1127 }
1128 logged = 1;
1129 sleep( 1 ); // be patient
1130 continue;
1131 }
1132
1133 // Found and opened, close it and then use ImageStreamIO
1134 logged = 0;
1135 close( SM_fd );
1136
1137 if( ImageStreamIO_openIm( &image, m_shmimName.c_str() ) == 0 )
1138 {
1139 if( image.md[0].sem <=
1140 m_semaphoreNumber ) ///<\todo this isn't right--> isn't there a define in cacao to use?
1141 {
1143 mx::sys::sleep( 1 ); // We just need to wait for the server process to finish startup.
1144 }
1145 else
1146 {
1147 opened = true;
1148
1149 char SM_fname[200];
1150 ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), m_shmimName.c_str() );
1151
1152 struct stat buffer;
1153 int rv = stat( SM_fname, &buffer );
1154
1155 if( rv != 0 )
1156 {
1158 __LINE__,
1159 errno,
1160 "Could not get inode for " + m_shmimName +
1161 ". Source process will need to be restarted." } );
1163 return;
1164 }
1165 inode = buffer.st_ino;
1166 }
1167 }
1168 else
1169 {
1170 mx::sys::sleep( 1 ); // be patient
1171 }
1172 }
1173
1174 if( m_restart )
1175 continue; // this is kinda dumb. we just go around on restart, so why test in the while loop at all?
1176
1177 if( m_shutdown || !opened )
1178 {
1179 if( !opened )
1180 return;
1181
1183 return;
1184 }
1185
1186 // now get a good semaphore
1188 ImageStreamIO_getsemwaitindex( &image, m_semaphoreNumber ); // ask for semaphore we had before
1189
1190 if( m_semaphoreNumber < 0 )
1191 {
1193 { __FILE__,
1194 __LINE__,
1195 "No valid semaphore found for " + m_shmimName + ". Source process will need to be restarted." } );
1196 return;
1197 }
1198
1200 __LINE__,
1201 "got semaphore index " + std::to_string( m_semaphoreNumber ) + " for " + m_shmimName } );
1202
1204
1205 sem = image.semptr[m_semaphoreNumber]; ///< The semaphore to monitor for new image data
1206
1207 m_dataType = image.md[0].datatype;
1209 m_width = image.md[0].size[0];
1210 m_height = image.md[0].size[1];
1211 size_t length;
1212 if( image.md[0].naxis == 3 )
1213 {
1214 length = image.md[0].size[2];
1215 }
1216 else
1217 {
1218 length = 1;
1219 }
1220 std::cerr << "connected"
1221 << " " << m_width << "x" << m_height << "x" << (int)m_dataType << " (" << m_typeSize << ")"
1222 << std::endl;
1223
1224 // Now allocate the circBuffs
1225 if( allocate_circbufs() < 0 )
1226 {
1227 return; // will cause shutdown!
1228 }
1229
1230 // And allocate the xrifs
1231 if( allocate_xrif() < 0 )
1232 {
1233 return; // Will cause shutdown!
1234 }
1235
1236 uint8_t atype;
1237 size_t snx, sny, snz;
1238
1239 uint64_t curr_image; // The current cnt1 index
1240 m_currImage = 0;
1241 m_currChunkStart = 0;
1242 m_nextChunkStart = 0;
1243
1244 // Initialized curr_image ...
1245 if( image.md[0].naxis > 2 )
1246 {
1247 curr_image = image.md[0].cnt1;
1248 }
1249 else
1250 {
1251 curr_image = 0;
1252 }
1253
1254 uint64_t last_cnt0; // = ((uint64_t)-1);
1255
1256 // so we can initialize last_cnt0 to avoid frame skip on startup
1257 if( image.cntarray )
1258 {
1259 last_cnt0 = image.cntarray[curr_image];
1260 }
1261 else
1262 {
1263 last_cnt0 = image.md[0].cnt0;
1264 }
1265
1266 int cnt0flag = 0;
1267
1268 bool restartWriting = false; // flag to prevent logging on a logging restart
1269
1270 // This is the main image grabbing loop.
1271 while( !m_shutdown && !m_restart )
1272 {
1273 timespec ts;
1275
1276 if( sem_timedwait( sem, &ts ) == 0 )
1277 {
1278 if( image.md[0].naxis > 2 )
1279 {
1280 curr_image = image.md[0].cnt1;
1281 }
1282 else
1283 {
1284 curr_image = 0;
1285 }
1286
1287 atype = image.md[0].datatype;
1288 snx = image.md[0].size[0];
1289 sny = image.md[0].size[1];
1290 if( image.md[0].naxis == 3 )
1291 {
1292 snz = image.md[0].size[2];
1293 }
1294 else
1295 {
1296 snz = 1;
1297 }
1298
1299 if( atype != m_dataType || snx != m_width || sny != m_height || snz != length )
1300 {
1301 break; // exit the nearest while loop and get the new image setup.
1302 }
1303
1304 if( m_shutdown || m_restart )
1305 {
1306 break; // Check for exit signals
1307 }
1308
1310 if( image.cntarray )
1311 {
1312 new_cnt0 = image.cntarray[curr_image];
1313 }
1314 else
1315 {
1316 new_cnt0 = image.md[0].cnt0;
1317 }
1318
1319#ifdef SW_DEBUG
1320 std::cerr << "new_cnt0: " << new_cnt0 << "\n";
1321#endif
1322
1323 ///\todo cleanup skip frame handling.
1324 if( new_cnt0 == last_cnt0 ) //<- this probably isn't useful really
1325 {
1326 log<text_log>( "semaphore raised but cnt0 has not changed -- we're probably getting behind",
1328 ++cnt0flag;
1329 if( cnt0flag > 10 )
1330 {
1331 m_restart = true; // if we get here 10 times then something else is wrong.
1332 }
1333 continue;
1334 }
1335
1336 if( new_cnt0 - last_cnt0 > 1 ) //<- this is what we want to check.
1337 {
1338 log<text_log>( "cnt0 changed by more than 1. Frame skipped.", logPrio::LOG_WARNING );
1339 }
1340
1341 cnt0flag = 0;
1342
1344
1346 char *curr_src = reinterpret_cast< char *>(image.array.raw) + curr_image * m_width * m_height * m_typeSize;
1347
1349
1351
1352 if( image.cntarray )
1353 {
1354 curr_timing[0] = image.cntarray[curr_image];
1355 curr_timing[1] = image.atimearray[curr_image].tv_sec;
1356 curr_timing[2] = image.atimearray[curr_image].tv_nsec;
1357 curr_timing[3] = image.writetimearray[curr_image].tv_sec;
1358 curr_timing[4] = image.writetimearray[curr_image].tv_nsec;
1359 }
1360 else
1361 {
1362 curr_timing[0] = image.md[0].cnt0;
1363 curr_timing[1] = image.md[0].atime.tv_sec;
1364 curr_timing[2] = image.md[0].atime.tv_nsec;
1365 curr_timing[3] = image.md[0].writetime.tv_sec;
1366 curr_timing[4] = image.md[0].writetime.tv_nsec;
1367 }
1368
1369 // Check if we need to time-stamp ourselves -- for old cacao streams
1370 if( curr_timing[1] == 0 )
1371 {
1372
1374 {
1375 log<software_critical>( { __FILE__, __LINE__, errno, 0, "clock_gettime" } );
1376 return;
1377 }
1378
1379 curr_timing[1] = missing_ts.tv_sec;
1380 curr_timing[2] = missing_ts.tv_nsec;
1381 }
1382
1383 // just set w-time to a-time if it's missing
1384 if( curr_timing[3] == 0 )
1385 {
1386 curr_timing[3] = curr_timing[1];
1387 curr_timing[4] = curr_timing[2];
1388 }
1389
1390 m_currImageTime = 1.0 * curr_timing[3] + ( 1.0 * curr_timing[4] ) / 1e9;
1391
1392 if( m_shutdown && m_writing == WRITING )
1393 {
1395 }
1396
1397 switch( m_writing )
1398 {
1399 case START_WRITING:
1400
1404
1405 if( !restartWriting ) // We only log if this is really a start
1406 {
1407 log<saving_start>( { 1, new_cnt0 } );
1408 }
1409 else // on a restart after a timeout we don't log
1410 {
1411 restartWriting = false;
1412 }
1413
1415
1416 // fall through
1417 case WRITING:
1419 {
1423
1424#ifdef SW_DEBUG
1425 std::cerr << __FILE__ << " " << __LINE__ << " WRITING " << m_currImage << " "
1426 << m_nextChunkStart << " "
1427 << ( m_currImage - m_nextChunkStart == m_writeChunkLength - 1 ) << " "
1429 << "\n";
1430#endif
1431
1432 // Now tell the writer to get going
1433 if( sem_post( &m_swSemaphore ) < 0 )
1434 {
1435 log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1436 return;
1437 }
1438
1441 {
1442 m_nextChunkStart = 0;
1443 }
1444
1447 }
1449 {
1453
1454#ifdef SW_DEBUG
1455 std::cerr << __FILE__ << " " << __LINE__ << " IMAGE TIME WRITING " << m_currImage << " "
1456 << m_nextChunkStart << " "
1457 << ( m_currImage - m_nextChunkStart == m_writeChunkLength - 1 ) << " "
1459 << "\n";
1460#endif
1461
1462 // Now tell the writer to get going
1463 if( sem_post( &m_swSemaphore ) < 0 )
1464 {
1465 log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1466 return;
1467 }
1468
1470 restartWriting = true;
1471 }
1472 break;
1473
1474 case STOP_WRITING:
1478
1479#ifdef SW_DEBUG
1480 std::cerr << __FILE__ << " " << __LINE__ << " STOP_WRITING\n";
1481#endif
1482
1483 // Now tell the writer to get going
1484 if( sem_post( &m_swSemaphore ) < 0 )
1485 {
1486 log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1487 return;
1488 }
1489 restartWriting = false;
1490 break;
1491
1492 default:
1493 break;
1494 }
1495
1496 ++m_currImage;
1498 {
1499 m_currImage = 0;
1500 }
1501 }
1502 else
1503 {
1504 // If semaphore times-out or errors, we first cleanup any writing that needs to be done
1505 // we can also get here if a signal interrupts the sem wait which is triggered by INDI callbacks
1506 switch( m_writing )
1507 {
1508 case WRITING:
1509 // Here, if there is at least 1 image, we check for delta-time > m_maxChunkTime
1510 // then write
1511 if( ( m_currImage - m_nextChunkStart > 0 ) &&
1512 ( mx::sys::get_curr_time() - m_currChunkStartTime > m_maxChunkTime ) )
1513 {
1517
1518#ifdef SW_DEBUG
1519 std::cerr << __FILE__ << " " << __LINE__ << " TIMEOUT WRITING " << " " << m_currImage << " "
1520 << m_nextChunkStart << " " << ( m_currImage - m_nextChunkStart ) << " " << last_cnt0
1521 << "\n";
1522#endif
1523
1524 // Now tell the writer to get going
1525 if( sem_post( &m_swSemaphore ) < 0 )
1526 {
1527 log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1528 return;
1529 }
1530
1532 restartWriting = true;
1533 }
1534 break;
1535 case STOP_WRITING:
1536 // If we timed-out while STOP_WRITING is set, we trigger a write.
1540
1541#ifdef SW_DEBUG
1542 std::cerr << __FILE__ << " " << __LINE__ << " TIMEOUT STOP_WRITING\n";
1543#endif
1544
1545 // Now tell the writer to get going
1546 if( sem_post( &m_swSemaphore ) < 0 )
1547 {
1548 log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1549 return;
1550 }
1551 restartWriting = false;
1552 break;
1553 default:
1554 break;
1555 }
1556
1557 if( image.md[0].sem <= 0 )
1558 {
1559 break; // Indicates that the server has cleaned up.
1560 }
1561
1562 // Check for why we timed out
1563 if( errno == EINTR )
1564 {
1565 break; // This will indicate time to shutdown, loop will exit normally flags set.
1566 }
1567
1568 // ETIMEDOUT just means we should wait more.
1569 // Otherwise, report an error.
1570 if( errno != ETIMEDOUT )
1571 {
1572 log<software_error>( { __FILE__, __LINE__, errno, "sem_timedwait" } );
1573 break;
1574 }
1575
1576 // Check if the file has disappeared.
1577 int SM_fd;
1578 char SM_fname[200];
1579 ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), m_shmimName.c_str() );
1580 SM_fd = open( SM_fname, O_RDWR );
1581 if( SM_fd == -1 )
1582 {
1583 m_restart = true;
1584 }
1585 close( SM_fd );
1586
1587 // Check if the inode changed
1588 struct stat buffer;
1589 int rv = stat( SM_fname, &buffer );
1590 if( rv != 0 )
1591 {
1592 m_restart = true;
1593 }
1594
1595 if( buffer.st_ino != inode )
1596 {
1597#ifdef SW_DEBUG
1598 std::cerr << "Restarting due to inode . . . \n";
1599#endif
1600 m_restart = true;
1601 }
1602 }
1603 }
1604
1605 ///\todo might still be writing here, so must check
1606 // If semaphore times-out or errors, we first cleanup any writing that needs to be done
1608 {
1609 // Here, if there is at least 1 image, then write
1610 if( ( m_currImage - m_nextChunkStart > 0 ) )
1611 {
1615
1617
1618 std::cerr << __FILE__ << " " << __LINE__ << " WRITING ON RESTART " << last_cnt0 << "\n";
1619 // Now tell the writer to get going
1620 if( sem_post( &m_swSemaphore ) < 0 )
1621 {
1622 log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1623 return;
1624 }
1625 }
1626 else
1627 {
1629 }
1630
1631 while( m_writing != NOT_WRITING )
1632 {
1633 std::cerr << __FILE__ << " " << __LINE__ << " WAITING TO FINISH WRITING " << last_cnt0 << "\n";
1634 sleep( 1 );
1635 }
1636 }
1637
1638 if( m_rawImageCircBuff )
1639 {
1642 }
1643
1644 if( m_timingCircBuff )
1645 {
1647 m_timingCircBuff = 0;
1648 }
1649
1650 if( opened )
1651 {
1652 if( m_semaphoreNumber >= 0 )
1653 {
1654 ///\todo is this release necessary with closeIM?
1655 image.semReadPID[m_semaphoreNumber] = 0; // release semaphore
1656 }
1658 opened = false;
1659 }
1660
1661 } // outer loop, will exit if m_shutdown==true
1662
1663 // One more check
1664 if( m_rawImageCircBuff )
1665 {
1668 }
1669
1670 if( m_timingCircBuff )
1671 {
1673 m_timingCircBuff = 0;
1674 }
1675
1676 if( opened )
1677 {
1678 if( m_semaphoreNumber >= 0 )
1679 {
1680 ///\todo is this release necessary with closeIM?
1681 image.semReadPID[m_semaphoreNumber] = 0; // release semaphore.
1682 }
1683
1685 }
1686}
1687
1689{
1690 s->swThreadExec();
1691}
1692
1694{
1696
1697 // Wait fpr the thread starter to finish initializing this thread.
1698 while( m_swThreadInit == true && m_shutdown == 0 )
1699 {
1700 sleep( 1 );
1701 }
1702
1703 while( !m_shutdown )
1704 {
1705 while( !shutdown() && ( !( state() == stateCodes::READY || state() == stateCodes::OPERATING ) ) )
1706 {
1707 if( m_fname )
1708 {
1709 free( m_fname );
1710 m_fname = nullptr;
1711 }
1712 sleep( 1 );
1713 }
1714
1715 if( shutdown() )
1716 {
1717 break;
1718 }
1719
1720 // This will happen after a reconnection, and could update m_shmimName, etc.
1721 if( m_fname == nullptr )
1722 {
1723 m_fnameBase = m_rawimageDir + "/" + m_outName + "_";
1724
1725 m_fnameSz = m_fnameBase.size() + sizeof( "YYYYMMDDHHMMSSNNNNNNNNN.xrif" ); // the sizeof includes the \0
1726 m_fname = reinterpret_cast< char *>(malloc( m_fnameSz ));
1727
1728 snprintf( m_fname, m_fnameSz, "%sYYYYMMDDHHMMSSNNNNNNNNN.xrif", m_fnameBase.c_str() );
1729 }
1730
1731 // at this point fname is not null.
1732
1733 timespec ts;
1734
1735 if( clock_gettime( CLOCK_REALTIME, &ts ) < 0 )
1736 {
1737 log<software_critical>( { __FILE__, __LINE__, errno, 0, "clock_gettime" } );
1738
1739 free( m_fname );
1740 m_fname = nullptr;
1741
1742 return; // will trigger a shutdown
1743 }
1744
1745 mx::sys::timespecAddNsec( ts, m_semWaitNSec );
1746
1747 if( sem_timedwait( &m_swSemaphore, &ts ) == 0 )
1748 {
1749 if( doEncode() < 0 )
1750 {
1751 log<software_critical>( { __FILE__, __LINE__, "error encoding data" } );
1752 return;
1753 }
1754 // Otherwise, success, and we just go on.
1755 }
1756 else
1757 {
1758 // Check for why we timed out
1759 if( errno == EINTR )
1760 {
1761 continue; // This will probably indicate time to shutdown, loop will exit normally if flags set.
1762 }
1763
1764 // ETIMEDOUT just means we should wait more.
1765 // Otherwise, report an error.
1766 if( errno != ETIMEDOUT )
1767 {
1768 log<software_error>( { __FILE__, __LINE__, errno, "sem_timedwait" } );
1769 break;
1770 }
1771 }
1772 } // outer loop, will exit if m_shutdown==true
1773
1774 if( m_fname )
1775 {
1776 free( m_fname );
1777 m_fname = nullptr;
1778 }
1779}
1780
1782{
1783 if( m_writing == NOT_WRITING )
1784 {
1785 return 0;
1786 }
1787
1788 recordSavingState( true );
1789
1790 // Record these to prevent a change in other thread
1793 size_t nFrames = m_currSaveStop - saveStart;
1794 size_t nBytes = m_width * m_height * m_typeSize;
1795
1796#ifdef SW_DEBUG
1797 std::cerr << "nFrames: " << nFrames << "\n";
1798#endif
1799
1800 if(nFrames == 0) //can happend during a stop. just clean up but don't try to write nothting.
1801 {
1802 #ifdef SW_DEBUG
1803 std::cerr << "nothing to write\n";
1804 #endif
1805
1806 recordSavingStats( true );
1807
1808 if( m_writing == STOP_WRITING )
1809 {
1812 }
1813
1814 recordSavingState( true );
1815
1816 return 0;
1817 }
1818 // Configure xrif and copy image data -- this does no allocations
1820 if( rv != XRIF_NOERROR )
1821 {
1822 // This is a big problem. Report it as "ALERT" and go on.
1823 log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif set size error. DATA POSSIBLY LOST" } );
1824 }
1825
1827 if( rv != XRIF_NOERROR )
1828 {
1829 // This may just be out of range, it's only an error.
1830 log<software_error>( { __FILE__, __LINE__, 0, rv, "xrif set LZ4 acceleration error." } );
1831 }
1832
1834
1835 // Configure xrif and copy timing data -- no allocations
1837 if( rv != XRIF_NOERROR )
1838 {
1839 // This is a big problem. Report it as "ALERT" and go on.
1840 log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif set size error. DATA POSSIBLY LOST." } );
1841 }
1842
1844 if( rv != XRIF_NOERROR )
1845 {
1846 // This may just be out of range, it's only an error.
1847 log<software_error>( { __FILE__, __LINE__, 0, rv, "xrif set LZ4 acceleration error." } );
1848 }
1849
1850#ifdef SW_DEBUG
1851 for( size_t nF = 0; nF < nFrames; ++nF )
1852 {
1853 std::cerr << " " << ( m_timingCircBuff + saveStart * 5 + nF * 5 )[0] << "\n";
1854 }
1855#endif
1856
1857 memcpy( m_xrif_timing->raw_buffer, m_timingCircBuff + saveStart * 5, nFrames * 5 * sizeof( uint64_t ) );
1858
1859 rv = xrif_encode( m_xrif );
1860 if( rv != XRIF_NOERROR )
1861 {
1862 // This is a big problem. Report it as "ALERT" and go on.
1863 log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif encode error. DATA POSSIBLY LOST." } );
1864 }
1865
1867 if( rv != XRIF_NOERROR )
1868 {
1869 // This is a big problem. Report it as "ALERT" and go on.
1870 log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif write header error. DATA POSSIBLY LOST." } );
1871 }
1872
1874 if( rv != XRIF_NOERROR )
1875 {
1876 // This is a big problem. Report it as "ALERT" and go on.
1877 log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif encode error. DATA POSSIBLY LOST." } );
1878 }
1879
1881 if( rv != XRIF_NOERROR )
1882 {
1883 // This is a big problem. Report it as "ALERT" and go on.
1884 log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif write header error. DATA POSSIBLY LOST" } );
1885 }
1886
1887 // Now break down the acq time of the first image in the buffer for use in file name
1888 tm uttime; // The broken down time.
1889 timespec *fts = reinterpret_cast< timespec *>( m_timingCircBuff + saveStart * 5 + 1 );
1890
1891 if( gmtime_r( &fts->tv_sec, &uttime ) == 0 )
1892 {
1893 // Yell at operator but keep going
1895 { __FILE__, __LINE__, errno, 0, "gmtime_r error. possible loss of timing information." } );
1896 }
1897
1898 // Available size = m_fnameSz-m_fnameBase.size(), rather than assuming sizeof("YYYYMMDDHHMMSSNNNNNNNNN"), in case we
1899 // screwed up somewhere.
1900 rv = snprintf( m_fname + m_fnameBase.size(),
1901 m_fnameSz - m_fnameBase.size(),
1902 "%04i%02i%02i%02i%02i%02i%09i",
1903 uttime.tm_year + 1900,
1904 uttime.tm_mon + 1,
1905 uttime.tm_mday,
1906 uttime.tm_hour,
1907 uttime.tm_min,
1908 uttime.tm_sec,
1909 static_cast<int>( fts->tv_nsec ) );
1910
1911 if( rv != sizeof( "YYYYMMDDHHMMSSNNNNNNNNN" ) - 1 )
1912 {
1913 // Something is very wrong. Keep going to try to get it on disk.
1914 log<software_alert>( { __FILE__, __LINE__, errno, rv, "did not write enough chars to timestamp" } );
1915 }
1916
1917 // Cover up the \0 inserted by snprintf
1918 ( m_fname + m_fnameBase.size() )[23] = '.';
1919
1920 FILE *fp_xrif = fopen( m_fname, "wb" );
1921 if( fp_xrif == NULL )
1922 {
1923 // This is it. If we can't write data to disk need to fix.
1924 log<software_alert>( { __FILE__, __LINE__, errno, 0, "failed to open file for writing" } );
1925
1926 free( m_fname );
1927 m_fname = nullptr;
1928
1929 return -1; // will trigger a shutdown
1930 }
1931
1932 size_t bw = fwrite( m_xrif_header, sizeof( uint8_t ), XRIF_HEADER_SIZE, fp_xrif );
1933
1934 if( bw != XRIF_HEADER_SIZE )
1935 {
1937 __LINE__,
1938 errno,
1939 0,
1940 "failure writing header to file. DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
1941 // We go on . . .
1942 }
1943
1944 bw = fwrite( m_xrif->raw_buffer, sizeof( uint8_t ), m_xrif->compressed_size, fp_xrif );
1945
1946 if( bw != m_xrif->compressed_size )
1947 {
1949 __LINE__,
1950 errno,
1951 0,
1952 "failure writing data to file. DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
1953 }
1954
1956
1957 if( bw != XRIF_HEADER_SIZE )
1958 {
1960 { __FILE__,
1961 __LINE__,
1962 errno,
1963 0,
1964 "failure writing timing header to file. DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
1965 }
1966
1967 bw = fwrite( m_xrif_timing->raw_buffer, sizeof( uint8_t ), m_xrif_timing->compressed_size, fp_xrif );
1968
1969 if( bw != m_xrif_timing->compressed_size )
1970 {
1972 { __FILE__,
1973 __LINE__,
1974 errno,
1975 0,
1976 "failure writing timing data to file. DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
1977 }
1978
1979 fclose( fp_xrif );
1980
1981 recordSavingStats( true );
1982
1983 if( m_writing == STOP_WRITING )
1984 {
1987 }
1988
1989 recordSavingState( true );
1990
1991 return 0;
1992
1993} // doEncode
1994
1995INDI_NEWCALLBACK_DEFN( streamWriter, m_indiP_writing )
1996( const pcf::IndiProperty &ipRecv )
1997{
1998 INDI_VALIDATE_CALLBACK_PROPS( m_indiP_writing, ipRecv );
1999
2000 if( !ipRecv.find( "toggle" ) )
2001 {
2002 return 0;
2003 }
2004
2005 if( ipRecv["toggle"].getSwitchState() == pcf::IndiElement::Off &&
2006 ( m_writing == WRITING || m_writing == START_WRITING ) )
2007 {
2008 m_writing = STOP_WRITING;
2009 }
2010
2011 if( ipRecv["toggle"].getSwitchState() == pcf::IndiElement::On && m_writing == NOT_WRITING )
2012 {
2013 m_writing = START_WRITING;
2014 }
2015
2016 return 0;
2017}
2018
2020{
2021 // Only update this if not changing
2022 if( m_writing == NOT_WRITING || m_writing == WRITING )
2023 {
2024 if( m_xrif && m_writing == WRITING )
2025 {
2026 indi::updateSwitchIfChanged( m_indiP_writing, "toggle", pcf::IndiElement::On, m_indiDriver, INDI_OK );
2027 indi::updateIfChanged( m_indiP_xrifStats, "ratio", m_xrif->compression_ratio, m_indiDriver, INDI_BUSY );
2029 m_indiP_xrifStats, "encodeMBsec", m_xrif->encode_rate / 1048576.0, m_indiDriver, INDI_BUSY );
2031 "encodeFPS",
2032 m_xrif->encode_rate / ( m_width * m_height * m_typeSize ),
2034 INDI_BUSY );
2036 m_indiP_xrifStats, "differenceMBsec", m_xrif->difference_rate / 1048576.0, m_indiDriver, INDI_BUSY );
2038 "differenceFPS",
2039 m_xrif->difference_rate / ( m_width * m_height * m_typeSize ),
2041 INDI_BUSY );
2043 m_indiP_xrifStats, "reorderMBsec", m_xrif->reorder_rate / 1048576.0, m_indiDriver, INDI_BUSY );
2045 "reorderFPS",
2046 m_xrif->reorder_rate / ( m_width * m_height * m_typeSize ),
2048 INDI_BUSY );
2050 m_indiP_xrifStats, "compressMBsec", m_xrif->compress_rate / 1048576.0, m_indiDriver, INDI_BUSY );
2052 "compressFPS",
2053 m_xrif->compress_rate / ( m_width * m_height * m_typeSize ),
2055 INDI_BUSY );
2056 }
2057 else
2058 {
2059 indi::updateSwitchIfChanged( m_indiP_writing, "toggle", pcf::IndiElement::Off, m_indiDriver, INDI_OK );
2069 }
2070 }
2071}
2072
2077
2079{
2080 return recordSavingState( true );
2081}
2082
2084{
2085 static int16_t lastState = -1;
2086 static uint64_t currSaveStart = -1;
2087
2088 int16_t state;
2090 m_writing == STOP_WRITING ) // Changed from just writing 5/2024
2091 state = 1;
2092 else
2093 state = 0;
2094
2096 {
2098
2099 lastState = state;
2101 }
2102
2103 return 0;
2104}
2105
2107{
2108 static uint32_t last_rawSize = -1;
2109 static uint32_t last_compressedSize = -1;
2110 static float last_encodeRate = -1;
2111 static float last_differenceRate = -1;
2112 static float last_reorderRate = -1;
2113 static float last_compressRate = -1;
2114
2115 if( m_xrif->raw_size != last_rawSize || m_xrif->compressed_size != last_compressedSize ||
2116 m_xrif->encode_rate != last_encodeRate || m_xrif->difference_rate != last_differenceRate ||
2117 m_xrif->reorder_rate != last_reorderRate || m_xrif->compress_rate != last_compressRate || force )
2118 {
2119 telem<telem_saving>( { (uint32_t)m_xrif->raw_size,
2120 (uint32_t)m_xrif->compressed_size,
2121 (float)m_xrif->encode_rate,
2122 (float)m_xrif->difference_rate,
2123 (float)m_xrif->reorder_rate,
2124 (float)m_xrif->compress_rate } );
2125
2126 last_rawSize = m_xrif->raw_size;
2127 last_compressedSize = m_xrif->compressed_size;
2128 last_encodeRate = m_xrif->encode_rate;
2129 last_differenceRate = m_xrif->difference_rate;
2130 last_reorderRate = m_xrif->reorder_rate;
2131 last_compressRate = m_xrif->compress_rate;
2132 }
2133
2134 return 0;
2135}
2136
2137} // namespace app
2138} // namespace MagAOX
2139
2140#endif
The base-class for MagAO-X applications.
Definition MagAOXApp.hpp:73
stateCodes::stateCodeT state()
Get the current state code.
int registerIndiPropertyNew(pcf::IndiProperty &prop, int(*)(void *, const pcf::IndiProperty &))
Register an INDI property which is exposed for others to request a New Property for.
int createStandardIndiToggleSw(pcf::IndiProperty &prop, const std::string &name, const std::string &label="", const std::string &group="")
Create a standard R/W INDI switch with a single toggle element.
int m_shutdown
Flag to signal it's time to shutdown. When not 0, the main loop exits.
int shutdown()
Get the value of the shutdown flag.
indiDriver< MagAOXApp > * m_indiDriver
The INDI driver wrapper. Constructed and initialized by execute, which starts and stops communication...
static int log(const typename logT::messageT &msg, logPrioT level=logPrio::LOG_DEFAULT)
Make a log entry.
int threadStart(std::thread &thrd, bool &thrdInit, pid_t &tpid, pcf::IndiProperty &thProp, int thrdPrio, const std::string &cpuset, const std::string &thrdName, thisPtr *thrdThis, Function &&thrdStart)
Start a thread, using this class's privileges to set priority, etc.
std::string MagAOXPath
The base path of the MagAO-X system.
Definition MagAOXApp.hpp:81
pcf::IndiProperty m_indiP_xrifStats
pcf::IndiProperty m_indiP_writing
double m_currImageTime
The write-time of the current image.
std::thread m_fgThread
A separate thread for the actual framegrabbings.
int m_fgThreadPrio
Priority of the framegrabber thread, should normally be > 00.
pid_t m_fgThreadID
F.g. thread PID.
size_t m_circBuffLength
The length of the circular buffer, in frames.
int initialize_xrif()
Initialize the xrif system.
int recordTelem(const telem_saving_state *)
uint8_t m_dataType
The ImageStreamIO type code.
unsigned m_semWaitSec
The time in whole sec to wait on the semaphore, to which m_semWaitNSec is added. Default is 0 nsec.
bool m_fgThreadInit
Synchronizer to ensure f.g. thread initializes before doing dangerous things.
int m_typeSize
The pixel byte depth.
xrif_t m_xrif_timing
The xrif compression handle for image data.
uint64_t m_currSaveStart
The circular buffer position at which to start saving.
virtual int appStartup()
Startup functions.
size_t m_writeChunkLength
The number of frames to write at a time.
size_t m_height
The height of the image.
uint64_t m_currSaveStopFrameNo
The frame number of the image at which saving stopped (for logging)
double m_currChunkStartTime
The write-time of the first image in the chunk.
int allocate_circbufs()
Worker function to allocate the circular buffers.
static void getCircBuffLengths(size_t &circBuffLength, double &circBuffSize, size_t &writeChunkLength, size_t maxCircBuffLength, double maxCircBuffSize, size_t maxWriteChunkLength, uint32_t width, uint32_t height, size_t typeSize)
void handlerSigSegv(int signum, siginfo_t *siginf, void *ucont)
Handles SIGSEGV and SIGBUS. Sets m_restart to true.
int m_swThreadPrio
Priority of the stream writer thread, should normally be > 0, and <= m_fgThreadPrio.
int recordSavingStats(bool force=false)
pcf::IndiProperty m_swThreadProp
The property to hold the s.w. thread details.
std::thread m_swThread
A separate thread for the actual writing.
std::string m_fgCpuset
The cpuset for the framegrabber thread. Ignored if empty (the default).
virtual int appLogic()
Implementation of the FSM for the Siglent SDG.
static void fgThreadStart(streamWriter *s)
Thread starter, called by fgThreadStart on thread construction. Calls fgThreadExec.
std::string m_outName
The name to use for outputting files, Default is m_shmimName.
size_t m_maxCircBuffLength
The maximum length of the circular buffer, in frames.
INDI_NEWCALLBACK_DECL(streamWriter, m_indiP_writing)
char * m_xrif_timing_header
Storage for the xrif image data file header.
~streamWriter() noexcept
Destructor.
int allocate_xrif()
Worker function to configure and allocate the xrif handles.
int setSigSegvHandler()
Sets the handler for SIGSEGV and SIGBUS.
std::string m_swCpuset
The cpuset for the framegrabber thread. Ignored if empty (the default).
xrif_t m_xrif
The xrif compression handle for image data.
virtual void setupConfig()
Setup the configuration system (called by MagAOXApp::setup())
int m_semaphoreNumber
The image structure semaphore index.
std::string m_rawimageDir
The path where files will be saved.
static void _handlerSigSegv(int signum, siginfo_t *siginf, void *ucont)
int recordSavingState(bool force=false)
int m_writing
Controls whether or not images are being written, and sequences start and stop of writing.
uint64_t m_currChunkStart
The circular buffer starting position of the current to-be-written chunk.
pid_t m_swThreadID
S.w. thread pid.
virtual void loadConfig()
load the configuration system results (called by MagAOXApp::setup())
pcf::IndiProperty m_fgThreadProp
The property to hold the f.g. thread details.
void swThreadExec()
Execute the stream writer main loop.
uint64_t m_nextChunkStart
The circular buffer starting position of the next to-be-written chunk.
static streamWriter * m_selfWriter
char * m_xrif_header
Storage for the xrif image data file header.
double m_maxChunkTime
The maximum time before writing regardless of number of frames.
double m_maxCircBuffSize
The maximum size of the circular bufffer in MB.
int doEncode()
Function called when semaphore is raised to do the encode and write.
static void swThreadStart(streamWriter *s)
Thread starter, called by swThreadStart on thread construction. Calls swThreadExec.
uint64_t m_currSaveStop
The circular buffer position at which to stop saving.
void fgThreadExec()
Execute the frame grabber main loop.
std::string m_shmimName
The name of the shared memory buffer.
dev::telemeter< streamWriter > telemeterT
size_t m_width
The width of the image.
double m_circBuffSize
The size of the circular buffer, in MB.
virtual int appShutdown()
Do any needed shutdown tasks. Currently nothing in this app.
sem_t m_swSemaphore
Semaphore used to synchronize the fg thread and the sw thread.
bool m_swThreadInit
Synchronizer to ensure s.w. thread initializes before doing dangerous things.
#define MAGAOX_rawimageRelPath
The relative path to the raw images directory.
Definition paths.hpp:92
#define INDI_NEWCALLBACK_DEFN(class, prop)
Define the callback for a new property request.
#define REG_INDI_NEWPROP_NOCB(prop, propName, type)
Register a NEW INDI property with the class, with no callback.
#define INDI_NEWCALLBACK(prop)
Get the name of the static callback wrapper for a new property.
@ OPERATING
The device is operating, other than homing.
@ READY
The device is ready for operation, but is not operating.
#define INDI_VALIDATE_CALLBACK_PROPS(prop1, prop2)
Standard check for matching INDI properties in a callback.
#define INDI_IDLE
Definition indiUtils.hpp:27
#define INDI_BUSY
Definition indiUtils.hpp:29
#define INDI_OK
Definition indiUtils.hpp:28
void updateIfChanged(pcf::IndiProperty &p, const std::string &el, const T &newVal, indiDriverT *indiDriver, pcf::IndiProperty::PropertyStateType newState=pcf::IndiProperty::Ok)
Update the value of the INDI element, but only if it has changed.
Definition indiUtils.hpp:92
void updateSwitchIfChanged(pcf::IndiProperty &p, const std::string &el, const pcf::IndiElement::SwitchStateType &newVal, indiDriverT *indiDriver, pcf::IndiProperty::PropertyStateType newState=pcf::IndiProperty::Ok)
Update the value of the INDI element, but only if it has changed.
std::stringstream msg
const pcf::IndiProperty & ipRecv
Definition dm.hpp:24
static constexpr logPrioT LOG_NOTICE
A normal but significant condition.
static constexpr logPrioT LOG_CRITICAL
The process can not continue and will shut down (fatal)
static constexpr logPrioT LOG_WARNING
A condition has occurred which may become an error, but the process continues.
static constexpr logPrioT LOG_DEBUG
Used for debugging.
#define XWC_SEM_WAIT_TS_RETVOID(ts, sec, nsec)
Add the wait time to a timespec for a sem_timedwait call, with no value returned on error.
Definition semUtils.hpp:20
#define NOT_WRITING
#define STOP_WRITING
#define WRITING
#define START_WRITING
A device base class which saves telemetry.
Definition telemeter.hpp:69
int appShutdown()
Perform telemeter application shutdown.
int loadConfig(appConfigurator &config)
Load the device section from an application configurator.
int appLogic()
Perform telemeter application logic.
int setupConfig(appConfigurator &config)
Setup an application configurator for the device section.
int appStartup()
Starts the telemetry log thread.
int checkRecordTimes(const telT &tel, telTs... tels)
Check the time of the last record for each telemetry type and make an entry if needed.
Software CRITICAL log entry.
Software ERR log entry.