API
 
Loading...
Searching...
No Matches
streamWriter.hpp
Go to the documentation of this file.
1/** \file streamWriter.hpp
2 * \brief The MagAO-X Image Stream Writer
3 *
4 * \author Jared R. Males (jaredmales@gmail.com)
5 *
6 * \ingroup streamWriter_files
7 */
8
9#ifndef streamWriter_hpp
10#define streamWriter_hpp
11
12#include <atomic>
13#include <chrono>
14#include <filesystem>
15#include <thread>
16
17#include <ImageStreamIO/ImageStruct.h>
18#include <ImageStreamIO/ImageStreamIO.h>
19
20#include <xrif/xrif.h>
21
22#include <mx/sys/timeUtils.hpp>
23
24#include "../../libMagAOX/libMagAOX.hpp" //Note this is included on command line to trigger pch
25#include "../../magaox_git_version.h"
26
27#define NOT_WRITING ( 0 )
28#define START_WRITING ( 1 )
29#define WRITING ( 2 )
30#define STOP_WRITING ( 3 )
31
32// #define SW_DEBUG
33
34namespace MagAOX
35{
36namespace app
37{
38
39/** \defgroup streamWriter ImageStreamIO Stream Writing
40 * \brief Writes the contents of an ImageStreamIO image stream to disk.
41 *
42 * <a href="../handbook/operating/software/apps/streamWriter.html">Application Documentation</a>
43 *
44 * \ingroup apps
45 *
46 */
47
48/** \defgroup streamWriter_files ImageStreamIO Stream Writing
49 * \ingroup streamWriter
50 */
51
52/** MagAO-X application to control writing ImageStreamIO streams to disk.
53 *
54 * \ingroup streamWriter
55 *
56 */
57class streamWriter : public MagAOXApp<>, public dev::telemeter<streamWriter>
58{
60
61 friend class dev::telemeter<streamWriter>;
62
63 // Give the test harness access.
64 friend class streamWriter_test;
66
67 protected:
68 /** \name configurable parameters
69 *@{
70 */
71
72 std::string m_rawimageDir; ///< The path where files will be saved.
73
74 size_t m_maxCircBuffLength{ 1024 }; ///< The maximum length of the circular buffer, in frames
75
76 double m_maxCircBuffSize{ 2048 }; ///< The maximum size of the circular bufffer in MB.
77
78 size_t m_maxWriteChunkLength{ 512 }; /**< The maximum number of frames to write at a time. Must
79 be an integer factor of m_maxCircBuffLength.*/
80
81 double m_maxChunkTime{ 10 }; ///< The maximum time before writing regardless of number of frames.
82
84 1.0 }; ///< Seconds to wait after a stop-writing command before flushing the pending data without a new frame.
85
86 bool m_startWriting{ false }; ///< Whether writing should be armed automatically at application startup.
87
88 std::string m_shmimName; ///< The name of the shared memory buffer.
89
90 std::string m_outName; ///< The name to use for outputting files, Default is m_shmimName.
91
92 int m_semaphoreNumber{ 7 }; ///< The image structure semaphore index.
93
94 unsigned m_semWaitSec{ 0 }; /**< The time in whole sec to wait on the semaphore,
95 to which m_semWaitNSec is added. Default is 0 nsec.*/
96
97 unsigned m_semWaitNSec{ 500000000 }; /**< The time in nsec to wait on the semaphore, added to m_semWaitSec.
98 Max is 999999999. Default is 5e8 nsec. */
99
101 true }; ///< Whether missed-data backlog summaries should be emitted as warnings instead of informational logs.
102
103 int m_lz4accel{ 1 };
104
105 bool m_compress{ true };
106
107 ///@}
108
109 size_t m_circBuffLength{ 1024 }; ///< The length of the circular buffer, in frames
110 double m_circBuffSize{ 2048.0 }; ///< The size of the circular buffer, in MB
111 size_t m_writeChunkLength{ 512 }; ///< The number of frames to write at a time
112
113 size_t m_width{ 0 }; ///< The width of the image
114 size_t m_height{ 0 }; ///< The height of the image
115 uint8_t m_dataType{ 0 }; ///< The ImageStreamIO type code.
116 int m_typeSize{ 0 }; ///< The pixel byte depth
117
118 char *m_rawImageCircBuff{ nullptr };
120
121 size_t m_currImage{ 0 };
122
123 uint64_t m_currImageTime{ 0 }; ///< The write-time of the current image in nanoseconds.
124
125 uint64_t m_currChunkStartTime{ 0 }; ///< The write-time of the first image in the chunk in nanoseconds.
126
127 std::atomic<uint64_t> m_skippedFrameCount{
128 0 }; ///< Count of skipped frames accumulated by the framegrabber thread since the last summary log.
129
130 std::atomic<uint64_t> m_repeatSemaphoreCount{
131 0 }; ///< Count of repeated semaphore wakes with unchanged frame count since the last summary log.
132
133 double m_skipSummaryIntervalSec{ 10.0 }; ///< Current interval between summary skip logs.
134
135 double m_nextSkipSummaryTime{ 0.0 }; ///< Time after which the next summary skip log may be emitted.
136
137 // Writer book-keeping:
138 int m_writing{ NOT_WRITING }; /**< Controls whether or not images are being written,
139 and sequences start and stop of writing.*/
140
141 std::atomic<bool> m_writePending{
142 false }; ///< Whether the writer thread still owns a queued save window and may touch the circular buffers.
143
144 double m_stopWriteDeadline{ 0.0 }; ///< Absolute time after which a stop-writing request should flush without a
145 ///< new frame.
146
148 5.0 }; ///< Seconds to wait for the writer thread to finish a queued flush during restart cleanup.
149
151 false }; ///< Tracks whether reconnect cleanup should resume writing immediately on the replacement stream.
152
153 uint64_t m_currChunkStart{ 0 }; ///< The circular buffer starting position of the current to-be-written chunk.
154 uint64_t m_nextChunkStart{ 0 }; ///< The circular buffer starting position of the next to-be-written chunk.
155
156 uint64_t m_currSaveStart{ 0 }; ///< The circular buffer position at which to start saving.
157 uint64_t m_currSaveStop{ 0 }; ///< The circular buffer position at which to stop saving.
158
159 uint64_t m_currSaveStopFrameNo{ 0 }; ///< The frame number of the image at which saving stopped (for logging)
160
161 /// The xrif compression handle for image data
162 xrif_t m_xrif{ nullptr };
163
164 /// Storage for the xrif image data file header
165 char *m_xrif_header{ nullptr };
166
167 /// The xrif compression handle for image data
169
170 /// Storage for the xrif image data file header
171 char *m_xrif_timing_header{ nullptr };
172
173 std::string m_outFilePath; ///< The full path for the latest output file
174
175 public:
176 /// Default c'tor
177 streamWriter();
178
179 /// Destructor
181
182 /// Setup the configuration system (called by MagAOXApp::setup())
183 virtual void setupConfig();
184
185 /// load the configuration system results (called by MagAOXApp::setup())
186 virtual void loadConfig();
187
188 /// Startup functions
189 /** Sets up the INDI vars.
190 *
191 */
192 virtual int appStartup();
193
194 /// Implementation of the FSM for the Siglent SDG
195 virtual int appLogic();
196
197 /// Do any needed shutdown tasks. Currently nothing in this app.
198 virtual int appShutdown();
199
200 protected:
201 /** \name SIGSEGV & SIGBUS signal handling
202 * These signals occur as a result of a ImageStreamIO source server resetting (e.g. changing frame sizes).
203 * When they occur a restart of the framegrabber and framewriter thread main loops is triggered.
204 *
205 * @{
206 */
207 bool m_restart{ false };
208
209 static streamWriter *m_selfWriter; ///< Static pointer to this (set in constructor). Used for getting out of the
210 ///< static SIGSEGV handler.
211
212 /// Initialize the xrif system.
213 /** Allocates the handles and headers pointers.
214 *
215 * \returns 0 on success.
216 * \returns -1 on error.
217 */
218 int initialize_xrif();
219
220 /// Sets the handler for SIGSEGV and SIGBUS
221 /** These are caused by ImageStreamIO server resets.
222 */
223 int setSigSegvHandler();
224
225 /// The handler called when SIGSEGV or SIGBUS is received, which will be due to ImageStreamIO server resets. Just a
226 /// wrapper for handlerSigSegv.
227 static void _handlerSigSegv( int signum, siginfo_t *siginf, void *ucont );
228
229 /// Handles SIGSEGV and SIGBUS. Sets m_restart to true.
230 void handlerSigSegv( int signum, siginfo_t *siginf, void *ucont );
231 ///@}
232
233 /** \name Framegrabber Thread
234 * This thread monitors the ImageStreamIO buffer and copies its images to the circular buffer.
235 *
236 * @{
237 */
238 int m_fgThreadPrio{ 1 }; ///< Priority of the framegrabber thread, should normally be > 00.
239
240 std::string m_fgCpuset; ///< The cpuset for the framegrabber thread. Ignored if empty (the default).
241
242 std::thread m_fgThread; ///< A separate thread for the actual framegrabbings
243
244 bool m_fgThreadInit{ true }; ///< Synchronizer to ensure f.g. thread initializes before doing dangerous things.
245
246 pid_t m_fgThreadID{ 0 }; ///< F.g. thread PID.
247
248 pcf::IndiProperty m_fgThreadProp; ///< The property to hold the f.g. thread details.
249
250 public:
251 static void getCircBuffLengths( size_t &circBuffLength,
252 double &circBuffSize,
253 size_t &writeChunkLength,
254 size_t maxCircBuffLength,
255 double maxCircBuffSize,
256 size_t maxWriteChunkLength,
257 uint32_t width,
258 uint32_t height,
259 size_t typeSize );
260
261 protected:
262 /// Worker function to allocate the circular buffers.
263 /** This takes place in the fg thread after connecting to the stream.
264 *
265 * \returns 0 on sucess.
266 * \returns -1 on error.
267 */
268 int allocate_circbufs();
269
270 /// Worker function to configure and allocate the xrif handles.
271 /** This takes place in the fg thread after connecting to the stream.
272 *
273 * \returns 0 on sucess.
274 * \returns -1 on error.
275 */
276 int allocate_xrif();
277
278 /// Release the circular buffers owned by the framegrabber thread.
279 void release_circbufs();
280
281 /// Wait for the writer thread to finish any queued save work before buffer teardown.
282 bool waitForWriteCompletion( uint64_t saveStopFrameNo /**< [in] The last frame number associated with the
283 queued save for timeout logging. */ );
284
285 /// Thread starter, called by fgThreadStart on thread construction. Calls fgThreadExec.
286 static void fgThreadStart( streamWriter *s /**< [in] a pointer to an streamWriter instance (normally this) */ );
287
288 /// Execute the frame grabber main loop.
289 void fgThreadExec();
290
291 ///@}
292
293 /** \name Stream Writer Thread
294 * This thread writes chunks of the circular buffer to disk.
295 *
296 * @{
297 */
298 int m_swThreadPrio{ 1 }; ///< Priority of the stream writer thread, should normally be > 0, and <= m_fgThreadPrio.
299
300 std::string m_swCpuset; ///< The cpuset for the framegrabber thread. Ignored if empty (the default).
301
302 sem_t m_swSemaphore; ///< Semaphore used to synchronize the fg thread and the sw thread.
303
304 std::thread m_swThread; ///< A separate thread for the actual writing
305
306 bool m_swThreadInit{ true }; ///< Synchronizer to ensure s.w. thread initializes before doing dangerous things.
307
308 pid_t m_swThreadID{ 0 }; ///< S.w. thread pid.
309
310 pcf::IndiProperty m_swThreadProp; ///< The property to hold the s.w. thread details.
311
312 /// Thread starter, called by swThreadStart on thread construction. Calls swThreadExec.
313 static void swThreadStart( streamWriter *s /**< [in] a pointer to an streamWriter instance (normally this) */ );
314
315 /// Execute the stream writer main loop.
316 void swThreadExec();
317
318 /// Function called when semaphore is raised to do the encode and write.
319 int doEncode();
320 ///@}
321
322 // INDI:
323 protected:
324 // declare our properties
325 pcf::IndiProperty m_indiP_writing;
326
327 pcf::IndiProperty m_indiP_xrifStats;
328
329 public:
331
332 void updateINDI();
333
334 /** \name Telemeter Interface
335 *
336 * @{
337 */
338 int checkRecordTimes();
339
340 int recordTelem( const telem_saving_state * );
341
342 int recordSavingState( bool force = false );
343 int recordSavingStats( bool force = false );
344
345 ///@}
346};
347
348// Set self pointer to null so app starts up uninitialized.
350
351streamWriter::streamWriter() : MagAOXApp( MAGAOX_CURRENT_SHA1, MAGAOX_REPO_MODIFIED )
352{
353 m_powerMgtEnabled = false;
354
355 m_selfWriter = this;
356
357 return;
358}
359
361{
363
364 if( m_xrif )
366
367 if( m_xrif_header )
369
370 if( m_xrif_timing )
372
375
376 return;
377}
378
380{
381 config.add( "writer.savePath",
382 "",
383 "writer.savePath",
384 argType::Required,
385 "writer",
386 "savePath",
387 false,
388 "string",
389 "The absolute path where images are saved. Will use MagAO-X default if not set." );
390
391 config.add( "writer.maxCircBuffLength",
392 "",
393 "writer.maxCircBuffLength",
394 argType::Required,
395 "writer",
396 "maxCircBuffLength",
397 false,
398 "size_t",
399 "The maximum length in frames of the circular buffer. Should be an integer multiple of and larger than "
400 "maxWriteChunkLength." );
401
402 config.add( "writer.maxCircBuffSize",
403 "",
404 "writer.maxCircBuffSize",
405 argType::Required,
406 "writer",
407 "maxCircBuffSize",
408 false,
409 "double",
410 "The maximum size in MB of the circular buffer. Should be sized to hold at least 2 of the maximum "
411 "frame size." );
412
413 config.add(
414 "writer.maxWriteChunkLength",
415 "",
416 "writer.maxWriteChunkLength",
417 argType::Required,
418 "writer",
419 "maxWriteChunkLength",
420 false,
421 "size_t",
422 "The maximum length in frames of the chunks to write to disk. Should be smaller than maxCircBuffLength." );
423
424 config.add( "writer.maxChunkTime",
425 "",
426 "writer.maxChunkTime",
427 argType::Required,
428 "writer",
429 "maxChunkTime",
430 false,
431 "float",
432 "The max length in seconds of the chunks to write to disk. Default is 60 sec." );
433
434 config.add( "writer.stopTimeout",
435 "",
436 "writer.stopTimeout",
437 argType::Required,
438 "writer",
439 "stopTimeout",
440 false,
441 "float",
442 "The max time in seconds to wait after a stop-writing command for the next frame before flushing the "
443 "pending data and returning to the idle state." );
444
445 config.add( "writer.startWriting",
446 "",
447 "writer.startWriting",
448 argType::Required,
449 "writer",
450 "startWriting",
451 false,
452 "bool",
453 "Flag controlling whether writing is armed automatically at application startup. Default is false." );
454
455 config.add( "writer.threadPrio",
456 "",
457 "writer.threadPrio",
458 argType::Required,
459 "writer",
460 "threadPrio",
461 false,
462 "int",
463 "The real-time priority of the stream writer thread." );
464
465 config.add( "writer.cpuset",
466 "",
467 "writer.cpuset",
468 argType::Required,
469 "writer",
470 "cpuset",
471 false,
472 "int",
473 "The cpuset for the writer thread." );
474
475 config.add( "writer.compress",
476 "",
477 "writer.compress",
478 argType::Required,
479 "writer",
480 "compress",
481 false,
482 "bool",
483 "Flag to set whether compression is used. Default true." );
484
485 config.add( "writer.lz4accel",
486 "",
487 "writer.lz4accel",
488 argType::Required,
489 "writer",
490 "lz4accel",
491 false,
492 "int",
493 "The LZ4 acceleration parameter. Larger is faster, but lower compression." );
494
495 config.add( "writer.outName",
496 "",
497 "writer.outName",
498 argType::Required,
499 "writer",
500 "outName",
501 false,
502 "int",
503 "The name to use for output files. Default is the shmimName." );
504
505 config.add( "framegrabber.shmimName",
506 "",
507 "framegrabber.shmimName",
508 argType::Required,
509 "framegrabber",
510 "shmimName",
511 false,
512 "int",
513 "The name of the stream to monitor. From /tmp/shmimName.im.shm." );
514
515 config.add( "framegrabber.semaphoreNumber",
516 "",
517 "framegrabber.semaphoreNumber",
518 argType::Required,
519 "framegrabber",
520 "semaphoreNumber",
521 false,
522 "int",
523 "The semaphore to wait on. Default is 7." );
524
525 config.add( "framegrabber.semWait",
526 "",
527 "framegrabber.semWait",
528 argType::Required,
529 "framegrabber",
530 "semWait",
531 false,
532 "int",
533 "The time in nsec to wait on the semaphore. Max is 999999999. Default is 5e8 nsec." );
534
535 config.add( "framegrabber.warnMissedData",
536 "",
537 "framegrabber.warnMissedData",
538 argType::Required,
539 "framegrabber",
540 "warnMissedData",
541 false,
542 "bool",
543 "Whether missed-data backlog summaries should be emitted at warning priority. Default is true." );
544
545 config.add( "framegrabber.threadPrio",
546 "",
547 "framegrabber.threadPrio",
548 argType::Required,
549 "framegrabber",
550 "threadPrio",
551 false,
552 "int",
553 "The real-time priority of the framegrabber thread." );
554
555 config.add( "framegrabber.cpuset",
556 "",
557 "framegrabber.cpuset",
558 argType::Required,
559 "framegrabber",
560 "cpuset",
561 false,
562 "string",
563 "The cpuset for the framegrabber thread." );
564
565 telemeterT::setupConfig( config );
566}
567
569{
570
571 config( m_maxCircBuffLength, "writer.maxCircBuffLength" );
572 config( m_maxCircBuffSize, "writer.maxCircBuffSize" );
573 config( m_maxWriteChunkLength, "writer.maxWriteChunkLength" );
574 config( m_maxChunkTime, "writer.maxChunkTime" );
575 config( m_writeStopTimeout, "writer.stopTimeout" );
576 if( m_writeStopTimeout < 0 )
577 {
579 }
580 config( m_startWriting, "writer.startWriting" );
581 config( m_swThreadPrio, "writer.threadPrio" );
582 config( m_swCpuset, "writer.cpuset" );
583 config( m_compress, "writer.compress" );
584 config( m_lz4accel, "writer.lz4accel" );
586 {
588 }
590 {
592 }
593
594 config( m_shmimName, "framegrabber.shmimName" );
595
597 config( m_outName, "writer.outName" );
598
599 config( m_semaphoreNumber, "framegrabber.semaphoreNumber" );
600 config( m_semWaitNSec, "framegrabber.semWait" );
601 config( m_warnMissedData, "framegrabber.warnMissedData" );
602
603 config( m_fgThreadPrio, "framegrabber.threadPrio" );
604 config( m_fgCpuset, "framegrabber.cpuset" );
605
606 // Set some defaults
607 // Setup default saving path
608 std::string tmpstr = mx::sys::getEnv( MAGAOX_env_rawimage );
609 if( tmpstr == "" )
610 {
612 }
613 m_rawimageDir = basePath() + "/" + tmpstr;
614
615 config( m_rawimageDir, "writer.savePath" );
616
617 if( telemeterT::loadConfig( config ) < 0 )
618 {
619 log<text_log>( "Error during telemeter config", logPrio::LOG_CRITICAL );
620 m_shutdown = true;
621 }
622}
623
625{
626 // Create save directory.
627 errno = 0;
628 if( mkdir( m_rawimageDir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH ) < 0 )
629 {
630 if( errno != EEXIST )
631 {
632 std::stringstream logss;
633 logss << "Failed to create image directory (" << m_rawimageDir << "). Errno says: " << strerror( errno );
635
636 return -1;
637 }
638 }
639
640 // set up the INDI properties
643
644 // Register the stats INDI property
645 REG_INDI_NEWPROP_NOCB( m_indiP_xrifStats, "xrif", pcf::IndiProperty::Number );
646 m_indiP_xrifStats.setLabel( "xrif compression performance" );
647
648 indi::addNumberElement<float>( m_indiP_xrifStats, "ratio", 0, 1.0, 0.0, "%0.2f", "Compression Ratio" );
649
650 indi::addNumberElement<float>( m_indiP_xrifStats,
651 "differenceMBsec",
652 0,
653 std::numeric_limits<float>::max(),
654 0.0,
655 "%0.2f",
656 "Differencing Rate [MB/sec]" );
657
658 indi::addNumberElement<float>( m_indiP_xrifStats,
659 "reorderMBsec",
660 0,
661 std::numeric_limits<float>::max(),
662 0.0,
663 "%0.2f",
664 "Reordering Rate [MB/sec]" );
665
666 indi::addNumberElement<float>( m_indiP_xrifStats,
667 "compressMBsec",
668 0,
669 std::numeric_limits<float>::max(),
670 0.0,
671 "%0.2f",
672 "Compression Rate [MB/sec]" );
673
674 indi::addNumberElement<float>( m_indiP_xrifStats,
675 "encodeMBsec",
676 0,
677 std::numeric_limits<float>::max(),
678 0.0,
679 "%0.2f",
680 "Total Encoding Rate [MB/sec]" );
681
682 indi::addNumberElement<float>( m_indiP_xrifStats,
683 "differenceFPS",
684 0,
685 std::numeric_limits<float>::max(),
686 0.0,
687 "%0.2f",
688 "Differencing Rate [f.p.s.]" );
689
690 indi::addNumberElement<float>( m_indiP_xrifStats,
691 "reorderFPS",
692 0,
693 std::numeric_limits<float>::max(),
694 0.0,
695 "%0.2f",
696 "Reordering Rate [f.p.s.]" );
697
698 indi::addNumberElement<float>( m_indiP_xrifStats,
699 "compressFPS",
700 0,
701 std::numeric_limits<float>::max(),
702 0.0,
703 "%0.2f",
704 "Compression Rate [f.p.s.]" );
705
706 indi::addNumberElement<float>( m_indiP_xrifStats,
707 "encodeFPS",
708 0,
709 std::numeric_limits<float>::max(),
710 0.0,
711 "%0.2f",
712 "Total Encoding Rate [f.p.s.]" );
713
714 // Now set up the framegrabber and writer threads.
715 // - need SIGSEGV and SIGBUS handling for ImageStreamIO restarts
716 // - initialize the semaphore
717 // - start the threads
718
719 if( setSigSegvHandler() < 0 )
720 return log<software_error, -1>( { __FILE__, __LINE__ } );
721
722 if( sem_init( &m_swSemaphore, 0, 0 ) < 0 )
723 return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "Initializing S.W. semaphore" } );
724
725 // Check if we have a safe writeChunkLengthh
727 {
728 return log<software_critical, -1>(
729 { __FILE__, __LINE__, "Write chunk length is not a divisor of circular buffer length." } );
730 }
731
732 if( initialize_xrif() < 0 )
733 {
735 }
736
737 if( m_startWriting )
738 {
740 }
741
748 "framegrabber",
749 this,
750 fgThreadStart ) < 0 )
751 {
752 return log<software_critical, -1>( { __FILE__, __LINE__ } );
753 }
754
761 "streamwriter",
762 this,
763 swThreadStart ) < 0 )
764 {
766 }
767
768 if( telemeterT::appStartup() < 0 )
769 {
770 return log<software_error, -1>( { __FILE__, __LINE__ } );
771 }
772
773 return 0;
774}
775
777{
778 double now = mx::sys::get_curr_time();
779 if( m_nextSkipSummaryTime == 0 )
780 {
782 }
783
785 {
789
792
793 if( shouldLog )
794 {
795 std::string msg = "stream ingest backlog: ";
796 bool appended = false;
797
799 {
800 msg += std::to_string( skippedFrames ) + " skipped frames";
801 appended = true;
802 }
803
804 if( repeatedSems > 0 )
805 {
806 if( appended )
807 {
808 msg += ", ";
809 }
810 msg += std::to_string( repeatedSems ) + " repeated semaphore wakes";
811 }
812 msg += " in last " + std::to_string( static_cast<int>( summaryInterval ) ) + " sec";
813
815
817 if( m_skipSummaryIntervalSec > 60.0 )
818 {
820 }
821 }
822 else
823 {
825 }
826
828 }
829
830 // first do a join check to see if other threads have exited.
831 // these will throw if the threads are really gone
832 try
833 {
834 if( pthread_tryjoin_np( m_fgThread.native_handle(), 0 ) == 0 )
835 {
836 log<software_error>( { __FILE__, __LINE__, "framegrabber thread has exited" } );
837 return -1;
838 }
839 }
840 catch( ... )
841 {
842 log<software_error>( { __FILE__, __LINE__, "framegrabber thread has exited" } );
843 return -1;
844 }
845
846 try
847 {
848 if( pthread_tryjoin_np( m_swThread.native_handle(), 0 ) == 0 )
849 {
850 log<software_error>( { __FILE__, __LINE__, "stream thread has exited" } );
851 return -1;
852 }
853 }
854 catch( ... )
855 {
856 log<software_error>( { __FILE__, __LINE__, "streamwriter thread has exited" } );
857 return -1;
858 }
859
860 switch( m_writing )
861 {
862 case NOT_WRITING:
864 break;
865 default:
867 }
868
870 {
871 if( telemeterT::appLogic() < 0 )
872 {
874 return 0;
875 }
876 }
877
878 updateINDI();
879
880 return 0;
881}
882
884{
886 m_writePending = false;
889 updateINDI();
890
891 try
892 {
893 if( m_fgThread.joinable() )
894 {
895 m_fgThread.join();
896 }
897 }
898 catch( ... )
899 {
900 }
901
902 try
903 {
904 if( m_swThread.joinable() )
905 {
906 m_swThread.join();
907 }
908 }
909 catch( ... )
910 {
911 }
912
914
915 if( m_xrif )
916 {
918 m_xrif = nullptr;
919 }
920
921 if( m_xrif_timing )
922 {
924 m_xrif_timing = nullptr;
925 }
926
928
929 return 0;
930}
931
933{
935 if( rv != XRIF_NOERROR )
936 {
937 return log<software_critical, -1>(
938 { __FILE__, __LINE__, 0, rv, "xrif handle allocation or initialization error." } );
939 }
940
941 if( m_compress )
942 {
944 if( rv != XRIF_NOERROR )
945 {
946 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
947 }
948 }
949 else
950 {
951 std::cerr << "not compressing . . . \n";
953 if( rv != XRIF_NOERROR )
954 {
955 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
956 }
957 }
958
959 errno = 0;
960
961 m_xrif_header = reinterpret_cast<char *>( malloc( XRIF_HEADER_SIZE * sizeof( char ) ) );
962 if( m_xrif_header == NULL )
963 {
964 return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "xrif header allocation failed." } );
965 }
966
968 if( rv != XRIF_NOERROR )
969 {
970 return log<software_critical, -1>(
971 { __FILE__, __LINE__, 0, rv, "xrif handle allocation or initialization error." } );
972 }
973
974 // m_xrif_timing->reorder_method = XRIF_REORDER_NONE;
976 if( rv != XRIF_NOERROR )
977 {
978 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
979 }
980
981 errno = 0;
982
983 m_xrif_timing_header = reinterpret_cast<char *>( malloc( XRIF_HEADER_SIZE * sizeof( char ) ) );
985 {
986 return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "xrif header allocation failed." } );
987 }
988
989 return 0;
990}
991
993{
994 struct sigaction act;
995 sigset_t set;
996
997 act.sa_sigaction = &streamWriter::_handlerSigSegv;
998 act.sa_flags = SA_SIGINFO;
999 sigemptyset( &set );
1000 act.sa_mask = set;
1001
1002 errno = 0;
1003 if( sigaction( SIGSEGV, &act, 0 ) < 0 )
1004 {
1005 std::string logss = "Setting handler for SIGSEGV failed. Errno says: ";
1006 logss += strerror( errno );
1007
1009
1010 return -1;
1011 }
1012
1013 errno = 0;
1014 if( sigaction( SIGBUS, &act, 0 ) < 0 )
1015 {
1016 std::string logss = "Setting handler for SIGBUS failed. Errno says: ";
1017 logss += strerror( errno );
1018
1020
1021 return -1;
1022 }
1023
1024 log<text_log>( "Installed SIGSEGV/SIGBUS signal handler.", logPrio::LOG_DEBUG );
1025
1026 return 0;
1027}
1028
1029void streamWriter::_handlerSigSegv( int signum, siginfo_t *siginf, void *ucont )
1030{
1032}
1033
1034void streamWriter::handlerSigSegv( int signum, siginfo_t *siginf, void *ucont )
1035{
1036 static_cast<void>( signum );
1037 static_cast<void>( siginf );
1038 static_cast<void>( ucont );
1039
1040 m_restart = true;
1041
1042 return;
1043}
1044
1045void streamWriter::getCircBuffLengths( size_t &circBuffLength,
1046 double &circBuffSize,
1047 size_t &writeChunkLength,
1048 size_t maxCircBuffLength,
1049 double maxCircBuffSize,
1050 size_t maxWriteChunkLength,
1051 uint32_t width,
1052 uint32_t height,
1053 size_t typeSize )
1054{
1055 static constexpr double MB = 1048576.0;
1056
1057 size_t isz = width * height * typeSize * maxCircBuffLength;
1058
1059 if( isz <= maxCircBuffSize * MB )
1060 {
1062 circBuffSize = isz / MB;
1064
1065 return;
1066 }
1067
1068 circBuffLength = maxCircBuffSize * MB / ( width * height * typeSize );
1069
1070 if( circBuffLength % 2 == 1 )
1071 {
1073 }
1074
1075 circBuffSize = ( width * height * typeSize * circBuffLength ) / MB;
1076
1078
1079 if( circBuffLength == 0 )
1080 {
1081 return;
1082 }
1083
1084 if( writeChunkLength == 0 )
1085 {
1086 writeChunkLength = 1;
1087 }
1088
1089 while( circBuffLength % writeChunkLength != 0 )
1090 {
1092 }
1093
1094 return;
1095}
1096
1098{
1099
1106 m_width,
1107 m_height,
1108 m_typeSize );
1109
1110 if( m_circBuffLength < 2 )
1111 {
1112 return log<software_critical, -1>( { __FILE__, __LINE__, "frame size too large to fit in maxCircBuffSize" } );
1113 }
1114
1116 {
1117 return log<software_critical, -1>(
1118 { __FILE__, __LINE__, "writeChunkLength is not smaller than circBuffLength" } );
1119 }
1120
1122 {
1123 return log<software_critical, -1>(
1124 { __FILE__, __LINE__, "writeChunkLength is not an integer factor of circBuffLength" } );
1125 }
1126
1127 std::string msg = "Set circ buff length: " + std::to_string( m_circBuffLength ) + " frames (";
1128 msg += std::to_string( m_circBuffSize ) + " MB). Write chunk length: " + std::to_string( m_writeChunkLength );
1129 msg += " frames.";
1130
1132
1133 if( m_rawImageCircBuff )
1134 {
1136 }
1137
1138 errno = 0;
1139 m_rawImageCircBuff = reinterpret_cast<char *>( malloc( m_width * m_height * m_typeSize * m_circBuffLength ) );
1140
1141 if( m_rawImageCircBuff == NULL )
1142 {
1143 return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "buffer allocation failure" } );
1144 }
1145
1146 if( m_timingCircBuff )
1147 {
1149 }
1150
1151 errno = 0;
1152 m_timingCircBuff = reinterpret_cast<uint64_t *>( malloc( 5 * sizeof( uint64_t ) * m_circBuffLength ) );
1153 if( m_timingCircBuff == NULL )
1154 {
1155 return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "buffer allocation failure" } );
1156 }
1157
1158 return 0;
1159}
1160
1162{
1163 // Set up the image data xrif handle
1165
1166 if( m_compress )
1167 {
1169 if( rv != XRIF_NOERROR )
1170 {
1171 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
1172 }
1173 }
1174 else
1175 {
1176 std::cerr << "not compressing . . . \n";
1178 if( rv != XRIF_NOERROR )
1179 {
1180 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
1181 }
1182 }
1183
1185 if( rv != XRIF_NOERROR )
1186 {
1187 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_set_size error." } );
1188 }
1189
1191 if( rv != XRIF_NOERROR )
1192 {
1193 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_raw error." } );
1194 }
1195
1197 if( rv != XRIF_NOERROR )
1198 {
1199 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_reordered error." } );
1200 }
1201
1202 // Set up the timing data xrif handle
1204 if( rv != XRIF_NOERROR )
1205 {
1206 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
1207 }
1208
1210 if( rv != XRIF_NOERROR )
1211 {
1212 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_set_size error." } );
1213 }
1214
1216 if( rv != XRIF_NOERROR )
1217 {
1218 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_raw error." } );
1219 }
1220
1222 if( rv != XRIF_NOERROR )
1223 {
1224 return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_reordered error." } );
1225 }
1226
1227 return 0;
1228}
1229
1231{
1232 if( m_rawImageCircBuff )
1233 {
1235 m_rawImageCircBuff = nullptr;
1236 }
1237
1238 if( m_timingCircBuff )
1239 {
1241 m_timingCircBuff = nullptr;
1242 }
1243}
1244
1245bool streamWriter::waitForWriteCompletion( uint64_t saveStopFrameNo )
1246{
1247 const auto timeout = std::chrono::duration<double>( m_writeCompletionTimeout );
1248 const auto deadline = std::chrono::steady_clock::now() + timeout;
1249 auto nextLog = std::chrono::steady_clock::now();
1250
1251 while( m_writePending )
1252 {
1253 const auto now = std::chrono::steady_clock::now();
1254
1255 if( now >= deadline )
1256 {
1257 log<text_log>( "timed out waiting " + std::to_string( m_writeCompletionTimeout ) +
1258 " sec for the writer thread to finish frame " + std::to_string( saveStopFrameNo ),
1260 return false;
1261 }
1262
1263 if( now >= nextLog )
1264 {
1265 log<text_log>( "still waiting for the writer thread to finish frame " + std::to_string( saveStopFrameNo ),
1267 nextLog = now + std::chrono::seconds( 1 );
1268 }
1269
1270 std::this_thread::sleep_for( std::chrono::milliseconds( 50 ) );
1271 }
1272
1273 return true;
1274}
1275
1277{
1278 o->fgThreadExec();
1279}
1280
1282{
1284
1285 // Wait fpr the thread starter to finish initializing this thread.
1286 while( m_fgThreadInit == true && m_shutdown == 0 )
1287 {
1288 sleep( 1 );
1289 }
1290
1291 timespec missing_ts;
1292
1293 IMAGE image;
1294 ino_t inode = 0; // The inode of the image stream file
1295
1296 bool opened = false;
1297
1298 while( m_shutdown == 0 )
1299 {
1300 /* Initialize ImageStreamIO
1301 */
1302 opened = false;
1303 m_restart = false; // Set this up front, since we're about to restart.
1304
1305 sem_t *sem{ nullptr }; ///< The semaphore to monitor for new image data
1306
1307 int logged = 0;
1308 while( !opened && !m_shutdown && !m_restart )
1309 {
1310 // b/c ImageStreamIO prints every single time, and latest version don't support stopping it yet,
1311 // and that isn't thread-safe-able anyway
1312 // we do our own checks. This is the same code in ImageStreamIO_openIm...
1313 int SM_fd;
1314 char SM_fname[200];
1315 ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), m_shmimName.c_str() );
1316 SM_fd = open( SM_fname, O_RDWR );
1317 if( SM_fd == -1 )
1318 {
1319 if( !logged )
1320 {
1321 log<text_log>( "ImageStream " + m_shmimName + " not found (yet). Retrying . . .",
1323 }
1324 logged = 1;
1325 sleep( 1 ); // be patient
1326 continue;
1327 }
1328
1329 // Found and opened, close it and then use ImageStreamIO
1330 logged = 0;
1331 close( SM_fd );
1332
1333 if( ImageStreamIO_openIm( &image, m_shmimName.c_str() ) == 0 )
1334 {
1335 if( image.md[0].sem < SEMAPHORE_MAXVAL )
1336 {
1338 mx::sys::sleep( 1 ); // We just need to wait for the server process to finish startup.
1339 }
1340 else
1341 {
1342 opened = true;
1343
1344 char SM_fname[200];
1345 ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), m_shmimName.c_str() );
1346
1347 struct stat buffer;
1348 int rv = stat( SM_fname, &buffer );
1349
1350 if( rv != 0 )
1351 {
1353 __LINE__,
1354 errno,
1355 "Could not get inode for " + m_shmimName +
1356 ". Source process will need to be restarted." } );
1358 return;
1359 }
1360 inode = buffer.st_ino;
1361 }
1362 }
1363 else
1364 {
1365 mx::sys::sleep( 1 ); // be patient
1366 }
1367 }
1368
1369 if( m_restart )
1370 continue; // this is kinda dumb. we just go around on restart, so why test in the while loop at all?
1371
1372 if( m_shutdown || !opened )
1373 {
1374 if( !opened )
1375 return;
1376
1378 return;
1379 }
1380
1381 // now get a good semaphore
1383 ImageStreamIO_getsemwaitindex( &image, m_semaphoreNumber ); // ask for semaphore we had before
1384
1385 if( m_semaphoreNumber < 0 )
1386 {
1388 { __FILE__,
1389 __LINE__,
1390 "No valid semaphore found for " + m_shmimName + ". Source process will need to be restarted." } );
1391 return;
1392 }
1393
1395 __LINE__,
1396 "got semaphore index " + std::to_string( m_semaphoreNumber ) + " for " + m_shmimName } );
1397
1399
1400 sem = image.semptr[m_semaphoreNumber]; ///< The semaphore to monitor for new image data
1401
1402 m_dataType = image.md[0].datatype;
1404 m_width = image.md[0].size[0];
1405 m_height = image.md[0].size[1];
1406 size_t length;
1407 if( image.md[0].naxis == 3 )
1408 {
1409 length = image.md[0].size[2];
1410 }
1411 else
1412 {
1413 length = 1;
1414 }
1415 std::cerr << "connected" << " " << m_width << "x" << m_height << "x" << (int)m_dataType << " (" << m_typeSize
1416 << ")" << std::endl;
1417
1418 // Now allocate the circBuffs
1419 if( allocate_circbufs() < 0 )
1420 {
1421 return; // will cause shutdown!
1422 }
1423
1424 // And allocate the xrifs
1425 if( allocate_xrif() < 0 )
1426 {
1427 return; // Will cause shutdown!
1428 }
1429
1430 // Buffer and XRIF setup can take long enough for monitor streams to accumulate
1431 // stale semaphore posts. Flush again and re-baseline counters so we do not
1432 // mistake startup backlog for repeated identical frames.
1434
1435 uint8_t atype;
1436 size_t snx, sny, snz;
1437 bool useFrameArrays = image.cntarray != nullptr && length > 1;
1438 bool useCnt1 = length > 1;
1439 bool streamIsCube = image.md[0].naxis == 3;
1441 uint64_t maxChunkTimeNs = static_cast<uint64_t>( m_maxChunkTime * 1e9 );
1442
1443 uint64_t curr_image; // The current cnt1 index
1444 m_currImage = 0;
1445 m_currChunkStart = 0;
1446 m_nextChunkStart = 0;
1447
1448 // Initialize curr_image after the post-setup flush.
1449 if( useCnt1 )
1450 {
1451 curr_image = image.md[0].cnt1;
1452 }
1453 else
1454 {
1455 curr_image = 0;
1456 }
1457
1458 uint64_t last_cnt0; // = ((uint64_t)-1);
1459
1460 // so we can initialize last_cnt0 to avoid frame skip on startup
1461 if( useFrameArrays )
1462 {
1463 last_cnt0 = image.cntarray[curr_image];
1464 }
1465 else
1466 {
1467 last_cnt0 = image.md[0].cnt0;
1468 }
1469
1470 int cnt0flag = 0;
1471
1472 bool restartWriting = false; // flag to prevent logging on a logging restart
1473
1474 // This is the main image grabbing loop.
1475 while( !m_shutdown && !m_restart )
1476 {
1477 timespec ts;
1479
1480 if( sem_timedwait( sem, &ts ) == 0 )
1481 {
1482 // Drain
1483 while( sem_trywait( sem ) == 0 )
1484 {
1485 }
1486
1487 if( errno != EAGAIN && errno != EINTR )
1488 {
1489 if( !m_shutdown && !m_restart )
1490 {
1491 log<software_error>( { __FILE__, __LINE__, errno, "sem_trywait" } );
1492 }
1493 break;
1494 }
1495
1496 if( useCnt1 )
1497 {
1498 curr_image = image.md[0].cnt1;
1499 }
1500 else
1501 {
1502 curr_image = 0;
1503 }
1504
1505 atype = image.md[0].datatype;
1506 snx = image.md[0].size[0];
1507 sny = image.md[0].size[1];
1508 if( streamIsCube )
1509 {
1510 snz = image.md[0].size[2];
1511 }
1512 else
1513 {
1514 snz = 1;
1515 }
1516
1517 if( atype != m_dataType || snx != m_width || sny != m_height || snz != length )
1518 {
1519 break; // exit the nearest while loop and get the new image setup.
1520 }
1521
1522 if( m_shutdown || m_restart )
1523 {
1524 break; // Check for exit signals
1525 }
1526
1528 if( useFrameArrays )
1529 {
1530 new_cnt0 = image.cntarray[curr_image];
1531 }
1532 else
1533 {
1534 new_cnt0 = image.md[0].cnt0;
1535 }
1536
1537 // clang-format off
1538 #ifdef SW_DEBUG
1539 std::cerr << "new_cnt0: " << new_cnt0 << "\n";
1540 #endif
1541 // clang-format on
1542
1543 if( new_cnt0 == last_cnt0 )
1544 {
1546 ++cnt0flag;
1547 if( cnt0flag > 10 ) // Because of the drain this shouldn't happen
1548 {
1549 m_restart = true; // if we get here 10 times then something else is wrong.
1550 }
1551 continue;
1552 }
1553
1554 if( new_cnt0 - last_cnt0 > 1 ) //<- this is what we want to check.
1555 {
1557 }
1558
1559 cnt0flag = 0;
1560
1562
1564 char *curr_src = reinterpret_cast<char *>( image.array.raw ) + curr_image * frameBytes;
1565
1567
1569
1570 if( useFrameArrays )
1571 {
1572 curr_timing[0] = image.cntarray[curr_image];
1573 curr_timing[1] = image.atimearray[curr_image].tv_sec;
1574 curr_timing[2] = image.atimearray[curr_image].tv_nsec;
1575 curr_timing[3] = image.writetimearray[curr_image].tv_sec;
1576 curr_timing[4] = image.writetimearray[curr_image].tv_nsec;
1577 }
1578 else
1579 {
1580 curr_timing[0] = image.md[0].cnt0;
1581 curr_timing[1] = image.md[0].atime.tv_sec;
1582 curr_timing[2] = image.md[0].atime.tv_nsec;
1583 curr_timing[3] = image.md[0].writetime.tv_sec;
1584 curr_timing[4] = image.md[0].writetime.tv_nsec;
1585 }
1586
1587 // Check if we need to time-stamp ourselves -- for old cacao streams
1588 if( curr_timing[1] == 0 )
1589 {
1590
1592 {
1593 log<software_critical>( { __FILE__, __LINE__, errno, 0, "clock_gettime" } );
1594 return;
1595 }
1596
1597 curr_timing[1] = missing_ts.tv_sec;
1598 curr_timing[2] = missing_ts.tv_nsec;
1599 }
1600
1601 // just set w-time to a-time if it's missing
1602 if( curr_timing[3] == 0 )
1603 {
1604 curr_timing[3] = curr_timing[1];
1605 curr_timing[4] = curr_timing[2];
1606 }
1607
1608 m_currImageTime = curr_timing[3] * 1000000000ULL + curr_timing[4];
1609
1610 if( m_shutdown && m_writing == WRITING )
1611 {
1614 }
1615
1616 switch( m_writing )
1617 {
1618 case START_WRITING:
1619
1623
1624 if( !restartWriting && !m_resumeAfterReconnect ) // We only log if this is really a start
1625 {
1626 log<saving_start>( { 1, new_cnt0 } );
1627 }
1628 else // on a restart after a timeout or reconnect we don't log
1629 {
1630 restartWriting = false;
1631 m_resumeAfterReconnect = false;
1632 }
1633
1635
1636 // fall through
1637 case WRITING:
1639 {
1643
1644#ifdef SW_DEBUG
1645 std::cerr << __FILE__ << " " << __LINE__ << " WRITING " << m_currImage << " "
1646 << m_nextChunkStart << " "
1647 << ( m_currImage - m_nextChunkStart == m_writeChunkLength - 1 ) << " "
1649 << "\n";
1650#endif
1651
1652 // Now tell the writer to get going
1653 m_writePending = true;
1654 if( sem_post( &m_swSemaphore ) < 0 )
1655 {
1656 m_writePending = false;
1657 log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1658 return;
1659 }
1660
1663 {
1664 m_nextChunkStart = 0;
1665 }
1666
1669 }
1671 {
1675
1676 // clang-format off
1677 #ifdef SW_DEBUG
1678 std::cerr << __FILE__ << " " << __LINE__ << " IMAGE TIME WRITING " << m_currImage << " "
1679 << m_nextChunkStart << " "
1680 << ( m_currImage - m_nextChunkStart == m_writeChunkLength - 1 ) << " "
1682 << "\n";
1683 #endif
1684 // clang-format on
1685
1686 // Now tell the writer to get going
1687 m_writePending = true;
1688 if( sem_post( &m_swSemaphore ) < 0 )
1689 {
1690 m_writePending = false;
1691 log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1692 return;
1693 }
1694
1696 restartWriting = true;
1697 }
1698 break;
1699
1700 case STOP_WRITING:
1704
1705 // clang-format off
1706 #ifdef SW_DEBUG
1707 std::cerr << __FILE__ << " " << __LINE__ << " STOP_WRITING\n";
1708 #endif
1709 // clang-format on
1710
1711 // Now tell the writer to get going
1712 m_writePending = true;
1714 if( sem_post( &m_swSemaphore ) < 0 )
1715 {
1716 m_writePending = false;
1717 log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1718 return;
1719 }
1720 restartWriting = false;
1721 break;
1722
1723 default:
1724 break;
1725 }
1726
1727 ++m_currImage;
1729 {
1730 m_currImage = 0;
1731 }
1732 }
1733 else
1734 {
1735 // If semaphore times-out or errors, we first cleanup any writing that needs to be done
1736 // we can also get here if a signal interrupts the sem wait which is triggered by INDI callbacks
1737 switch( m_writing )
1738 {
1739 case WRITING:
1740 // Here, if there is at least 1 image, we check for delta-time > m_maxChunkTime
1741 // then write
1742 if( ( m_currImage - m_nextChunkStart > 0 ) &&
1743 ( static_cast<uint64_t>( mx::sys::get_curr_time() * 1e9 ) - m_currChunkStartTime >
1744 maxChunkTimeNs ) )
1745 {
1749
1750#ifdef SW_DEBUG
1751 std::cerr << __FILE__ << " " << __LINE__ << " TIMEOUT WRITING " << " " << m_currImage << " "
1752 << m_nextChunkStart << " " << ( m_currImage - m_nextChunkStart ) << " " << last_cnt0
1753 << "\n";
1754#endif
1755
1756 // Now tell the writer to get going
1757 m_writePending = true;
1758 if( sem_post( &m_swSemaphore ) < 0 )
1759 {
1760 m_writePending = false;
1761 log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1762 return;
1763 }
1764
1766 restartWriting = true;
1767 }
1768 break;
1769 case STOP_WRITING:
1770 if( mx::sys::get_curr_time() >= m_stopWriteDeadline )
1771 {
1772 // If the stop timer expires before the next frame arrives, flush the current chunk and
1773 // settle back to the idle state exactly as we would on the next frame.
1777
1778#ifdef SW_DEBUG
1779 std::cerr << __FILE__ << " " << __LINE__ << " TIMEOUT STOP_WRITING\n";
1780#endif
1781
1782 // Now tell the writer to get going
1783 m_writePending = true;
1785 if( sem_post( &m_swSemaphore ) < 0 )
1786 {
1787 m_writePending = false;
1788 log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1789 return;
1790 }
1791 restartWriting = false;
1792 }
1793 break;
1794 default:
1795 break;
1796 }
1797
1798 if( image.md[0].sem <= 0 )
1799 {
1800 break; // Indicates that the server has cleaned up.
1801 }
1802
1803 // Check for why we timed out
1804 if( errno == EINTR )
1805 {
1806 break; // This will indicate time to shutdown, loop will exit normally flags set.
1807 }
1808
1809 // ETIMEDOUT just means we should wait more.
1810 // Otherwise, report an error.
1811 if( errno != ETIMEDOUT )
1812 {
1813 log<software_error>( { __FILE__, __LINE__, errno, "sem_timedwait" } );
1814 break;
1815 }
1816
1817 // Check if the file has disappeared.
1818 int SM_fd;
1819 char SM_fname[200];
1820 ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), m_shmimName.c_str() );
1821 SM_fd = open( SM_fname, O_RDWR );
1822 if( SM_fd == -1 )
1823 {
1824 m_restart = true;
1825 }
1826 close( SM_fd );
1827
1828 // Check if the inode changed
1829 struct stat buffer;
1830 int rv = stat( SM_fname, &buffer );
1831 if( rv != 0 )
1832 {
1833 m_restart = true;
1834 }
1835
1836 if( buffer.st_ino != inode )
1837 {
1838#ifdef SW_DEBUG
1839 std::cerr << "Restarting due to inode . . . \n";
1840#endif
1841 m_restart = true;
1842 }
1843 }
1844 }
1845
1846 const bool resumeAfterReconnect = ( m_writing == WRITING );
1847
1849 {
1850 m_shutdown = 1;
1851 return;
1852 }
1853
1854 ///\todo might still be writing here, so must check
1855 // If semaphore times-out or errors, we first cleanup any writing that needs to be done
1857 {
1858 // Here, if there is at least 1 image, then write
1859 if( ( m_currImage - m_nextChunkStart > 0 ) )
1860 {
1864
1868
1869 std::cerr << __FILE__ << " " << __LINE__ << " WRITING ON RESTART " << last_cnt0 << "\n";
1870 // Now tell the writer to get going
1871 m_writePending = true;
1872 if( sem_post( &m_swSemaphore ) < 0 )
1873 {
1874 m_writePending = false;
1875 log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1876 return;
1877 }
1878 }
1879 else if( resumeAfterReconnect )
1880 {
1884 }
1885 else
1886 {
1888 m_resumeAfterReconnect = false;
1889 }
1890
1892 {
1893 m_shutdown = 1;
1894 return;
1895 }
1896 }
1897
1899
1900 if( opened )
1901 {
1902 if( m_semaphoreNumber >= 0 )
1903 {
1904 ///\todo is this release necessary with closeIM?
1905 image.semReadPID[m_semaphoreNumber] = 0; // release semaphore
1906 }
1908 opened = false;
1909 }
1910
1911 } // outer loop, will exit if m_shutdown==true
1912
1913 // One more check
1914 if( m_rawImageCircBuff )
1915 {
1918 }
1919
1920 if( m_timingCircBuff )
1921 {
1923 m_timingCircBuff = 0;
1924 }
1925
1926 if( opened )
1927 {
1928 if( m_semaphoreNumber >= 0 )
1929 {
1930 ///\todo is this release necessary with closeIM?
1931 image.semReadPID[m_semaphoreNumber] = 0; // release semaphore.
1932 }
1933
1935 }
1936}
1937
1939{
1940 s->swThreadExec();
1941}
1942
1944{
1946
1947 // Wait fpr the thread starter to finish initializing this thread.
1948 while( m_swThreadInit == true && m_shutdown == 0 )
1949 {
1950 sleep( 1 );
1951 }
1952
1953 while( !m_shutdown )
1954 {
1955 while( !shutdown() && ( !( state() == stateCodes::READY || state() == stateCodes::OPERATING ) ) )
1956 {
1957 sleep( 1 );
1958 }
1959
1960 if( shutdown() )
1961 {
1962 break;
1963 }
1964
1965 timespec ts;
1966
1967 if( clock_gettime( CLOCK_REALTIME, &ts ) < 0 )
1968 {
1969 log<software_critical>( { __FILE__, __LINE__, errno, 0, "clock_gettime" } );
1970 return; // will trigger a shutdown
1971 }
1972
1973 mx::sys::timespecAddNsec( ts, m_semWaitNSec );
1974
1975 if( sem_timedwait( &m_swSemaphore, &ts ) == 0 )
1976 {
1977 if( doEncode() < 0 )
1978 {
1979 log<software_critical>( { __FILE__, __LINE__, "error encoding data" } );
1980 return;
1981 }
1982 // Otherwise, success, and we just go on.
1983 }
1984 else
1985 {
1986 // Check for why we timed out
1987 if( errno == EINTR )
1988 {
1989 continue; // This will probably indicate time to shutdown, loop will exit normally if flags set.
1990 }
1991
1992 // ETIMEDOUT just means we should wait more.
1993 // Otherwise, report an error.
1994 if( errno != ETIMEDOUT )
1995 {
1996 log<software_error>( { __FILE__, __LINE__, errno, "sem_timedwait" } );
1997 break;
1998 }
1999 }
2000 } // outer loop, will exit if m_shutdown==true
2001}
2002
2004{
2005 if( m_writing == NOT_WRITING )
2006 {
2007 m_writePending = false;
2008 return 0;
2009 }
2010
2011 recordSavingState( true );
2012
2013 // Record these to prevent a change in other thread
2016 size_t nFrames = m_currSaveStop - saveStart;
2017 size_t nBytes = m_width * m_height * m_typeSize;
2018
2019#ifdef SW_DEBUG
2020 std::cerr << "nFrames: " << nFrames << "\n";
2021#endif
2022
2023 if( nFrames == 0 ) // can happend during a stop. just clean up but don't try to write nothting.
2024 {
2025#ifdef SW_DEBUG
2026 std::cerr << "nothing to write\n";
2027#endif
2028
2029 recordSavingStats( true );
2030
2031 if( m_writing == STOP_WRITING )
2032 {
2034 {
2036 }
2037 else
2038 {
2041 }
2043 }
2044
2045 recordSavingState( true );
2046
2047 m_writePending = false;
2048 return 0;
2049 }
2050 // Configure xrif and copy image data -- this does no allocations
2052 if( rv != XRIF_NOERROR )
2053 {
2054 // This is a big problem. Report it as "ALERT" and go on.
2055 log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif set size error. DATA POSSIBLY LOST" } );
2056 }
2057
2059 if( rv != XRIF_NOERROR )
2060 {
2061 // This may just be out of range, it's only an error.
2062 log<software_error>( { __FILE__, __LINE__, 0, rv, "xrif set LZ4 acceleration error." } );
2063 }
2064
2066
2067 // Configure xrif and copy timing data -- no allocations
2069 if( rv != XRIF_NOERROR )
2070 {
2071 // This is a big problem. Report it as "ALERT" and go on.
2072 log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif set size error. DATA POSSIBLY LOST." } );
2073 }
2074
2076 if( rv != XRIF_NOERROR )
2077 {
2078 // This may just be out of range, it's only an error.
2079 log<software_error>( { __FILE__, __LINE__, 0, rv, "xrif set LZ4 acceleration error." } );
2080 }
2081
2082#ifdef SW_DEBUG
2083 for( size_t nF = 0; nF < nFrames; ++nF )
2084 {
2085 std::cerr << " " << ( m_timingCircBuff + saveStart * 5 + nF * 5 )[0] << "\n";
2086 }
2087#endif
2088
2089 memcpy( m_xrif_timing->raw_buffer, m_timingCircBuff + saveStart * 5, nFrames * 5 * sizeof( uint64_t ) );
2090
2091 rv = xrif_encode( m_xrif );
2092 if( rv != XRIF_NOERROR )
2093 {
2094 // This is a big problem. Report it as "ALERT" and go on.
2095 log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif encode error. DATA POSSIBLY LOST." } );
2096 }
2097
2099 if( rv != XRIF_NOERROR )
2100 {
2101 // This is a big problem. Report it as "ALERT" and go on.
2102 log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif write header error. DATA POSSIBLY LOST." } );
2103 }
2104
2106 if( rv != XRIF_NOERROR )
2107 {
2108 // This is a big problem. Report it as "ALERT" and go on.
2109 log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif encode error. DATA POSSIBLY LOST." } );
2110 }
2111
2113 if( rv != XRIF_NOERROR )
2114 {
2115 // This is a big problem. Report it as "ALERT" and go on.
2116 log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif write header error. DATA POSSIBLY LOST" } );
2117 }
2118
2119 // Now break down the acq time of the first image in the buffer for use in file name
2120 // tm uttime; // The broken down time.
2121 timespec *fts = reinterpret_cast<timespec *>( m_timingCircBuff + saveStart * 5 + 1 );
2122
2123 std::string fileName;
2124 std::string relPath;
2125 mx::error_t errc = file::fileTimeRelPath( fileName, relPath, m_outName, "xrif", fts->tv_sec, fts->tv_nsec );
2126 if( errc != mx::error_t::noerror )
2127 {
2128 std::string msg = "error from file::fileTimeRePath: ";
2129 msg += mx::errorMessage( errc );
2130 msg += " (" + std::string( mx::errorName( errc ) ) + ")";
2131 m_writePending = false;
2132 return log<software_error, -1>( { __FILE__, __LINE__, msg } );
2133 }
2134
2135 std::string fullPath = m_rawimageDir + '/' + relPath;
2136
2137 try
2138 {
2139 std::filesystem::create_directories( fullPath ); // this does nothing if fname already exists
2140 }
2141 catch( const std::filesystem::filesystem_error &e )
2142 {
2143 std::string msg = "filesystem_error from std::create_directories. ";
2144 msg += e.what();
2145 msg += " code: ";
2146 msg += e.code().value();
2147 m_writePending = false;
2148 return log<software_critical, -1>( { __FILE__, __LINE__, msg } );
2149 }
2150 catch( const std::exception &e )
2151 {
2152 std::string msg = "exception from std::create_directories. ";
2153 msg += e.what();
2154 m_writePending = false;
2155 return log<software_critical, -1>( { __FILE__, __LINE__, msg } );
2156 }
2157
2159 FILE *fp_xrif = fopen( m_outFilePath.c_str(), "wb" );
2160 if( fp_xrif == NULL )
2161 {
2162 // This is it. If we can't write data to disk need to fix.
2163 m_writePending = false;
2164 return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "failed to open file for writing" } );
2165 }
2166
2167 size_t bw = fwrite( m_xrif_header, sizeof( uint8_t ), XRIF_HEADER_SIZE, fp_xrif );
2168
2169 if( bw != XRIF_HEADER_SIZE )
2170 {
2172 __LINE__,
2173 errno,
2174 0,
2175 "failure writing header to file. DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
2176 // We go on . . .
2177 }
2178
2179 bw = fwrite( m_xrif->raw_buffer, sizeof( uint8_t ), m_xrif->compressed_size, fp_xrif );
2180
2181 if( bw != m_xrif->compressed_size )
2182 {
2184 __LINE__,
2185 errno,
2186 0,
2187 "failure writing data to file. DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
2188 }
2189
2191
2192 if( bw != XRIF_HEADER_SIZE )
2193 {
2195 { __FILE__,
2196 __LINE__,
2197 errno,
2198 0,
2199 "failure writing timing header to file. DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
2200 }
2201
2202 bw = fwrite( m_xrif_timing->raw_buffer, sizeof( uint8_t ), m_xrif_timing->compressed_size, fp_xrif );
2203
2204 if( bw != m_xrif_timing->compressed_size )
2205 {
2207 { __FILE__,
2208 __LINE__,
2209 errno,
2210 0,
2211 "failure writing timing data to file. DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
2212 }
2213
2214 fclose( fp_xrif );
2215
2216 recordSavingStats( true );
2217
2218 if( m_writing == STOP_WRITING )
2219 {
2221 {
2223 }
2224 else
2225 {
2228 }
2230 }
2231
2232 recordSavingState( true );
2233
2234 m_writePending = false;
2235 return 0;
2236
2237} // doEncode
2238
2239INDI_NEWCALLBACK_DEFN( streamWriter, m_indiP_writing )
2240( const pcf::IndiProperty &ipRecv )
2241{
2242 INDI_VALIDATE_CALLBACK_PROPS( m_indiP_writing, ipRecv );
2243
2244 if( !ipRecv.find( "toggle" ) )
2245 {
2246 return 0;
2247 }
2248
2249 if( ipRecv["toggle"].getSwitchState() == pcf::IndiElement::Off &&
2250 ( m_writing == WRITING || m_writing == START_WRITING ) )
2251 {
2252 m_writing = STOP_WRITING;
2253 m_resumeAfterReconnect = false;
2254 m_stopWriteDeadline = mx::sys::get_curr_time() + m_writeStopTimeout;
2255 }
2256
2257 if( ipRecv["toggle"].getSwitchState() == pcf::IndiElement::On && m_writing == NOT_WRITING )
2258 {
2259 m_writing = START_WRITING;
2260 m_resumeAfterReconnect = false;
2261 m_stopWriteDeadline = 0;
2262 }
2263
2264 return 0;
2265}
2266
2268{
2269 // Only update this if not changing
2271 {
2272 if( m_xrif && ( m_writing == WRITING || m_writing == START_WRITING ) )
2273 {
2274 indi::updateSwitchIfChanged( m_indiP_writing, "toggle", pcf::IndiElement::On, m_indiDriver, INDI_OK );
2275 indi::updateIfChanged( m_indiP_xrifStats, "ratio", m_xrif->compression_ratio, m_indiDriver, INDI_BUSY );
2277 m_indiP_xrifStats, "encodeMBsec", m_xrif->encode_rate / 1048576.0, m_indiDriver, INDI_BUSY );
2279 "encodeFPS",
2280 m_xrif->encode_rate / ( m_width * m_height * m_typeSize ),
2282 INDI_BUSY );
2284 m_indiP_xrifStats, "differenceMBsec", m_xrif->difference_rate / 1048576.0, m_indiDriver, INDI_BUSY );
2286 "differenceFPS",
2287 m_xrif->difference_rate / ( m_width * m_height * m_typeSize ),
2289 INDI_BUSY );
2291 m_indiP_xrifStats, "reorderMBsec", m_xrif->reorder_rate / 1048576.0, m_indiDriver, INDI_BUSY );
2293 "reorderFPS",
2294 m_xrif->reorder_rate / ( m_width * m_height * m_typeSize ),
2296 INDI_BUSY );
2298 m_indiP_xrifStats, "compressMBsec", m_xrif->compress_rate / 1048576.0, m_indiDriver, INDI_BUSY );
2300 "compressFPS",
2301 m_xrif->compress_rate / ( m_width * m_height * m_typeSize ),
2303 INDI_BUSY );
2304 }
2305 else
2306 {
2307 indi::updateSwitchIfChanged( m_indiP_writing, "toggle", pcf::IndiElement::Off, m_indiDriver, INDI_OK );
2317 }
2318 }
2319}
2320
2325
2327{
2328 return recordSavingState( true );
2329}
2330
2332{
2333 static int16_t lastState = -1;
2334 static uint64_t currSaveStart = -1;
2335
2336 int16_t state;
2338 m_writing == STOP_WRITING ) // Changed from just writing 5/2024
2339 state = 1;
2340 else
2341 state = 0;
2342
2344 {
2346
2347 lastState = state;
2349 }
2350
2351 return 0;
2352}
2353
2355{
2356 static uint32_t last_rawSize = -1;
2357 static uint32_t last_compressedSize = -1;
2358 static float last_encodeRate = -1;
2359 static float last_differenceRate = -1;
2360 static float last_reorderRate = -1;
2361 static float last_compressRate = -1;
2362
2363 if( m_xrif->raw_size != last_rawSize || m_xrif->compressed_size != last_compressedSize ||
2364 m_xrif->encode_rate != last_encodeRate || m_xrif->difference_rate != last_differenceRate ||
2365 m_xrif->reorder_rate != last_reorderRate || m_xrif->compress_rate != last_compressRate || force )
2366 {
2367 telem<telem_saving>( { (uint32_t)m_xrif->raw_size,
2368 (uint32_t)m_xrif->compressed_size,
2369 (float)m_xrif->encode_rate,
2370 (float)m_xrif->difference_rate,
2371 (float)m_xrif->reorder_rate,
2372 (float)m_xrif->compress_rate } );
2373
2374 last_rawSize = m_xrif->raw_size;
2375 last_compressedSize = m_xrif->compressed_size;
2376 last_encodeRate = m_xrif->encode_rate;
2377 last_differenceRate = m_xrif->difference_rate;
2378 last_reorderRate = m_xrif->reorder_rate;
2379 last_compressRate = m_xrif->compress_rate;
2380 }
2381
2382 return 0;
2383}
2384
2385} // namespace app
2386} // namespace MagAOX
2387
2388#endif
The base-class for XWCTk applications.
std::string basePath()
Get the.
stateCodes::stateCodeT state()
Get the current state code.
int registerIndiPropertyNew(pcf::IndiProperty &prop, int(*)(void *, const pcf::IndiProperty &))
Register an INDI property which is exposed for others to request a New Property for.
int createStandardIndiToggleSw(pcf::IndiProperty &prop, const std::string &name, const std::string &label="", const std::string &group="")
Create a standard R/W INDI switch with a single toggle element.
int m_shutdown
Flag to signal it's time to shutdown. When not 0, the main loop exits.
int shutdown()
Get the value of the shutdown flag.
indiDriver< MagAOXApp > * m_indiDriver
The INDI driver wrapper. Constructed and initialized by execute, which starts and stops communication...
static int log(const typename logT::messageT &msg, logPrioT level=logPrio::LOG_DEFAULT)
Make a log entry.
int threadStart(std::thread &thrd, bool &thrdInit, pid_t &tpid, pcf::IndiProperty &thProp, int thrdPrio, const std::string &cpuset, const std::string &thrdName, thisPtr *thrdThis, Function &&thrdStart)
Start a thread, using this class's privileges to set priority, etc.
pcf::IndiProperty m_indiP_xrifStats
double m_skipSummaryIntervalSec
Current interval between summary skip logs.
pcf::IndiProperty m_indiP_writing
std::thread m_fgThread
A separate thread for the actual framegrabbings.
int m_fgThreadPrio
Priority of the framegrabber thread, should normally be > 00.
pid_t m_fgThreadID
F.g. thread PID.
double m_writeStopTimeout
Seconds to wait after a stop-writing command before flushing the pending data without a new frame.
size_t m_circBuffLength
The length of the circular buffer, in frames.
int initialize_xrif()
Initialize the xrif system.
int recordTelem(const telem_saving_state *)
uint8_t m_dataType
The ImageStreamIO type code.
bool m_fgThreadInit
Synchronizer to ensure f.g. thread initializes before doing dangerous things.
int m_typeSize
The pixel byte depth.
bool m_resumeAfterReconnect
Tracks whether reconnect cleanup should resume writing immediately on the replacement stream.
uint64_t m_currImageTime
The write-time of the current image in nanoseconds.
xrif_t m_xrif_timing
The xrif compression handle for image data.
uint64_t m_currSaveStart
The circular buffer position at which to start saving.
std::atomic< uint64_t > m_repeatSemaphoreCount
Count of repeated semaphore wakes with unchanged frame count since the last summary log.
virtual int appStartup()
Startup functions.
size_t m_writeChunkLength
The number of frames to write at a time.
size_t m_height
The height of the image.
uint64_t m_currSaveStopFrameNo
The frame number of the image at which saving stopped (for logging)
double m_writeCompletionTimeout
Seconds to wait for the writer thread to finish a queued flush during restart cleanup.
int allocate_circbufs()
Worker function to allocate the circular buffers.
static void getCircBuffLengths(size_t &circBuffLength, double &circBuffSize, size_t &writeChunkLength, size_t maxCircBuffLength, double maxCircBuffSize, size_t maxWriteChunkLength, uint32_t width, uint32_t height, size_t typeSize)
void handlerSigSegv(int signum, siginfo_t *siginf, void *ucont)
Handles SIGSEGV and SIGBUS. Sets m_restart to true.
int m_swThreadPrio
Priority of the stream writer thread, should normally be > 0, and <= m_fgThreadPrio.
int recordSavingStats(bool force=false)
pcf::IndiProperty m_swThreadProp
The property to hold the s.w. thread details.
bool m_warnMissedData
Whether missed-data backlog summaries should be emitted as warnings instead of informational logs.
friend class streamWriter_data_test
uint64_t m_currChunkStartTime
The write-time of the first image in the chunk in nanoseconds.
std::thread m_swThread
A separate thread for the actual writing.
std::atomic< bool > m_writePending
Whether the writer thread still owns a queued save window and may touch the circular buffers.
std::string m_fgCpuset
The cpuset for the framegrabber thread. Ignored if empty (the default).
virtual int appLogic()
Implementation of the FSM for the Siglent SDG.
static void fgThreadStart(streamWriter *s)
Thread starter, called by fgThreadStart on thread construction. Calls fgThreadExec.
void release_circbufs()
Release the circular buffers owned by the framegrabber thread.
std::string m_outName
The name to use for outputting files, Default is m_shmimName.
size_t m_maxCircBuffLength
The maximum length of the circular buffer, in frames.
INDI_NEWCALLBACK_DECL(streamWriter, m_indiP_writing)
std::atomic< uint64_t > m_skippedFrameCount
Count of skipped frames accumulated by the framegrabber thread since the last summary log.
char * m_xrif_timing_header
Storage for the xrif image data file header.
double m_nextSkipSummaryTime
Time after which the next summary skip log may be emitted.
~streamWriter() noexcept
Destructor.
int allocate_xrif()
Worker function to configure and allocate the xrif handles.
int setSigSegvHandler()
Sets the handler for SIGSEGV and SIGBUS.
std::string m_swCpuset
The cpuset for the framegrabber thread. Ignored if empty (the default).
bool m_startWriting
Whether writing should be armed automatically at application startup.
xrif_t m_xrif
The xrif compression handle for image data.
virtual void setupConfig()
Setup the configuration system (called by MagAOXApp::setup())
int m_semaphoreNumber
The image structure semaphore index.
std::string m_rawimageDir
The path where files will be saved.
static void _handlerSigSegv(int signum, siginfo_t *siginf, void *ucont)
int recordSavingState(bool force=false)
uint64_t m_currChunkStart
The circular buffer starting position of the current to-be-written chunk.
pid_t m_swThreadID
S.w. thread pid.
bool waitForWriteCompletion(uint64_t saveStopFrameNo)
Wait for the writer thread to finish any queued save work before buffer teardown.
virtual void loadConfig()
load the configuration system results (called by MagAOXApp::setup())
pcf::IndiProperty m_fgThreadProp
The property to hold the f.g. thread details.
void swThreadExec()
Execute the stream writer main loop.
uint64_t m_nextChunkStart
The circular buffer starting position of the next to-be-written chunk.
static streamWriter * m_selfWriter
char * m_xrif_header
Storage for the xrif image data file header.
double m_maxChunkTime
The maximum time before writing regardless of number of frames.
double m_maxCircBuffSize
The maximum size of the circular bufffer in MB.
int doEncode()
Function called when semaphore is raised to do the encode and write.
static void swThreadStart(streamWriter *s)
Thread starter, called by swThreadStart on thread construction. Calls swThreadExec.
uint64_t m_currSaveStop
The circular buffer position at which to stop saving.
void fgThreadExec()
Execute the frame grabber main loop.
std::string m_outFilePath
The full path for the latest output file.
std::string m_shmimName
The name of the shared memory buffer.
dev::telemeter< streamWriter > telemeterT
size_t m_width
The width of the image.
double m_circBuffSize
The size of the circular buffer, in MB.
virtual int appShutdown()
Do any needed shutdown tasks. Currently nothing in this app.
sem_t m_swSemaphore
Semaphore used to synchronize the fg thread and the sw thread.
bool m_swThreadInit
Synchronizer to ensure s.w. thread initializes before doing dangerous things.
#define protected
#define MAGAOX_rawimageRelPath
The relative path to the raw images directory.
Definition paths.hpp:92
#define MAGAOX_env_rawimage
Environment variable setting the relative raw image path.
#define INDI_NEWCALLBACK_DEFN(class, prop)
Define the callback for a new property request.
#define REG_INDI_NEWPROP_NOCB(prop, propName, type)
Register a NEW INDI property with the class, with no callback.
#define INDI_NEWCALLBACK(prop)
Get the name of the static callback wrapper for a new property.
#define INDI_VALIDATE_CALLBACK_PROPS(prop1, prop2)
Standard check for matching INDI properties in a callback.
#define INDI_IDLE
Definition indiUtils.hpp:27
#define INDI_BUSY
Definition indiUtils.hpp:29
#define INDI_OK
Definition indiUtils.hpp:28
void updateIfChanged(pcf::IndiProperty &p, const std::string &el, const T &newVal, indiDriverT *indiDriver, pcf::IndiProperty::PropertyStateType newState=pcf::IndiProperty::Ok)
Update the value of the INDI element, but only if it has changed.
Definition indiUtils.hpp:92
void updateSwitchIfChanged(pcf::IndiProperty &p, const std::string &el, const pcf::IndiElement::SwitchStateType &newVal, indiDriverT *indiDriver, pcf::IndiProperty::PropertyStateType newState=pcf::IndiProperty::Ok)
Update the value of the INDI element, but only if it has changed.
std::stringstream msg
const pcf::IndiProperty & ipRecv
mx::error_t fileTimeRelPath(std::string &tstamp, std::string &relPath, time_t ts_sec, long ts_nsec)
Get the timestamp and the relative path based on a time.
Definition dm.hpp:19
static constexpr logPrioT LOG_NOTICE
A normal but significant condition.
static constexpr logPrioT LOG_INFO
Informational. The info log level is the lowest level recorded during normal operations.
static constexpr logPrioT LOG_CRITICAL
The process can not continue and will shut down (fatal)
static constexpr logPrioT LOG_WARNING
A condition has occurred which may become an error, but the process continues.
static constexpr logPrioT LOG_DEBUG
Used for debugging.
#define XWC_SEM_WAIT_TS_RETVOID(ts, sec, nsec)
Add the wait time to a timespec for a sem_timedwait call, with no value returned on error.
Definition semUtils.hpp:20
#define NOT_WRITING
#define STOP_WRITING
#define WRITING
#define START_WRITING
#define xrif_encode
#define xrif_configure
#define xrif_set_size
#define xrif_allocate_reordered
#define xrif_write_header
#define fwrite
#define xrif_set_lz4_acceleration
#define xrif_allocate_raw
A device base class which saves telemetry.
Definition telemeter.hpp:75
int appShutdown()
Perform telemeter application shutdown.
int loadConfig(appConfigurator &config)
Load the device section from an application configurator.
int appLogic()
Perform telemeter application logic.
int setupConfig(appConfigurator &config)
Setup an application configurator for the device section.
int appStartup()
Starts the telemetry log thread.
int checkRecordTimes(const telT &tel, telTs... tels)
Check the time of the last record for each telemetry type and make an entry if needed.
@ OPERATING
The device is operating, other than homing.
@ READY
The device is ready for operation, but is not operating.
Software CRITICAL log entry.
Software ERR log entry.