Line data Source code
1 : /** \file streamWriter.hpp
2 : * \brief The MagAO-X Image Stream Writer
3 : *
4 : * \author Jared R. Males (jaredmales@gmail.com)
5 : *
6 : * \ingroup streamWriter_files
7 : */
8 :
9 : #ifndef streamWriter_hpp
10 : #define streamWriter_hpp
11 :
12 : #include <filesystem>
13 :
14 : #include <ImageStreamIO/ImageStruct.h>
15 : #include <ImageStreamIO/ImageStreamIO.h>
16 :
17 : #include <xrif/xrif.h>
18 :
19 : #include <mx/sys/timeUtils.hpp>
20 :
21 : #include "../../libMagAOX/libMagAOX.hpp" //Note this is included on command line to trigger pch
22 : #include "../../magaox_git_version.h"
23 :
24 : #define NOT_WRITING ( 0 )
25 : #define START_WRITING ( 1 )
26 : #define WRITING ( 2 )
27 : #define STOP_WRITING ( 3 )
28 :
29 : // #define SW_DEBUG
30 :
31 : namespace MagAOX
32 : {
33 : namespace app
34 : {
35 :
36 : /** \defgroup streamWriter ImageStreamIO Stream Writing
37 : * \brief Writes the contents of an ImageStreamIO image stream to disk.
38 : *
39 : * <a href="../handbook/operating/software/apps/streamWriter.html">Application Documentation</a>
40 : *
41 : * \ingroup apps
42 : *
43 : */
44 :
45 : /** \defgroup streamWriter_files ImageStreamIO Stream Writing
46 : * \ingroup streamWriter
47 : */
48 :
49 : /** MagAO-X application to control writing ImageStreamIO streams to disk.
50 : *
51 : * \ingroup streamWriter
52 : *
53 : */
54 : class streamWriter : public MagAOXApp<>, public dev::telemeter<streamWriter>
55 : {
56 : typedef dev::telemeter<streamWriter> telemeterT;
57 :
58 : friend class dev::telemeter<streamWriter>;
59 :
60 : // Give the test harness access.
61 : friend class streamWriter_test;
62 : friend class streamWriter_data_test;
63 :
64 : protected:
65 : /** \name configurable parameters
66 : *@{
67 : */
68 :
69 : std::string m_rawimageDir; ///< The path where files will be saved.
70 :
71 : size_t m_maxCircBuffLength{ 1024 }; ///< The maximum length of the circular buffer, in frames
72 :
73 : double m_maxCircBuffSize{ 2048 }; ///< The maximum size of the circular bufffer in MB.
74 :
75 : size_t m_maxWriteChunkLength{ 512 }; /**< The maximum number of frames to write at a time. Must
76 : be an integer factor of m_maxCircBuffLength.*/
77 :
78 : double m_maxChunkTime{ 10 }; ///< The maximum time before writing regardless of number of frames.
79 :
80 : std::string m_shmimName; ///< The name of the shared memory buffer.
81 :
82 : std::string m_outName; ///< The name to use for outputting files, Default is m_shmimName.
83 :
84 : int m_semaphoreNumber{ 7 }; ///< The image structure semaphore index.
85 :
86 : unsigned m_semWaitSec{ 0 }; /**< The time in whole sec to wait on the semaphore,
87 : to which m_semWaitNSec is added. Default is 0 nsec.*/
88 :
89 : unsigned m_semWaitNSec{ 500000000 }; /**< The time in nsec to wait on the semaphore, added to m_semWaitSec.
90 : Max is 999999999. Default is 5e8 nsec. */
91 :
92 : int m_lz4accel{ 1 };
93 :
94 : bool m_compress{ true };
95 :
96 : ///@}
97 :
98 : size_t m_circBuffLength{ 1024 }; ///< The length of the circular buffer, in frames
99 : double m_circBuffSize{ 2048.0 }; ///< The size of the circular buffer, in MB
100 : size_t m_writeChunkLength{ 512 }; ///< The number of frames to write at a time
101 :
102 : size_t m_width{ 0 }; ///< The width of the image
103 : size_t m_height{ 0 }; ///< The height of the image
104 : uint8_t m_dataType{ 0 }; ///< The ImageStreamIO type code.
105 : int m_typeSize{ 0 }; ///< The pixel byte depth
106 :
107 : char *m_rawImageCircBuff{ nullptr };
108 : uint64_t *m_timingCircBuff{ nullptr };
109 :
110 : size_t m_currImage{ 0 };
111 :
112 : double m_currImageTime{ 0 }; ///< The write-time of the current image
113 :
114 : double m_currChunkStartTime{ 0 }; ///< The write-time of the first image in the chunk
115 :
116 : // Writer book-keeping:
117 : int m_writing{ NOT_WRITING }; /**< Controls whether or not images are being written,
118 : and sequences start and stop of writing.*/
119 :
120 : uint64_t m_currChunkStart{ 0 }; ///< The circular buffer starting position of the current to-be-written chunk.
121 : uint64_t m_nextChunkStart{ 0 }; ///< The circular buffer starting position of the next to-be-written chunk.
122 :
123 : uint64_t m_currSaveStart{ 0 }; ///< The circular buffer position at which to start saving.
124 : uint64_t m_currSaveStop{ 0 }; ///< The circular buffer position at which to stop saving.
125 :
126 : uint64_t m_currSaveStopFrameNo{ 0 }; ///< The frame number of the image at which saving stopped (for logging)
127 :
128 : /// The xrif compression handle for image data
129 : xrif_t m_xrif{ nullptr };
130 :
131 : /// Storage for the xrif image data file header
132 : char *m_xrif_header{ nullptr };
133 :
134 : /// The xrif compression handle for image data
135 : xrif_t m_xrif_timing{ nullptr };
136 :
137 : /// Storage for the xrif image data file header
138 : char *m_xrif_timing_header{ nullptr };
139 :
140 : std::string m_outFilePath; ///< The full path for the latest output file
141 :
142 : public:
143 : /// Default c'tor
144 : streamWriter();
145 :
146 : /// Destructor
147 : ~streamWriter() noexcept;
148 :
149 : /// Setup the configuration system (called by MagAOXApp::setup())
150 : virtual void setupConfig();
151 :
152 : /// load the configuration system results (called by MagAOXApp::setup())
153 : virtual void loadConfig();
154 :
155 : /// Startup functions
156 : /** Sets up the INDI vars.
157 : *
158 : */
159 : virtual int appStartup();
160 :
161 : /// Implementation of the FSM for the Siglent SDG
162 : virtual int appLogic();
163 :
164 : /// Do any needed shutdown tasks. Currently nothing in this app.
165 : virtual int appShutdown();
166 :
167 : protected:
168 : /** \name SIGSEGV & SIGBUS signal handling
169 : * These signals occur as a result of a ImageStreamIO source server resetting (e.g. changing frame sizes).
170 : * When they occur a restart of the framegrabber and framewriter thread main loops is triggered.
171 : *
172 : * @{
173 : */
174 : bool m_restart{ false };
175 :
176 : static streamWriter *m_selfWriter; ///< Static pointer to this (set in constructor). Used for getting out of the
177 : ///< static SIGSEGV handler.
178 :
179 : /// Initialize the xrif system.
180 : /** Allocates the handles and headers pointers.
181 : *
182 : * \returns 0 on success.
183 : * \returns -1 on error.
184 : */
185 : int initialize_xrif();
186 :
187 : /// Sets the handler for SIGSEGV and SIGBUS
188 : /** These are caused by ImageStreamIO server resets.
189 : */
190 : int setSigSegvHandler();
191 :
192 : /// The handler called when SIGSEGV or SIGBUS is received, which will be due to ImageStreamIO server resets. Just a
193 : /// wrapper for handlerSigSegv.
194 : static void _handlerSigSegv( int signum, siginfo_t *siginf, void *ucont );
195 :
196 : /// Handles SIGSEGV and SIGBUS. Sets m_restart to true.
197 : void handlerSigSegv( int signum, siginfo_t *siginf, void *ucont );
198 : ///@}
199 :
200 : /** \name Framegrabber Thread
201 : * This thread monitors the ImageStreamIO buffer and copies its images to the circular buffer.
202 : *
203 : * @{
204 : */
205 : int m_fgThreadPrio{ 1 }; ///< Priority of the framegrabber thread, should normally be > 00.
206 :
207 : std::string m_fgCpuset; ///< The cpuset for the framegrabber thread. Ignored if empty (the default).
208 :
209 : std::thread m_fgThread; ///< A separate thread for the actual framegrabbings
210 :
211 : bool m_fgThreadInit{ true }; ///< Synchronizer to ensure f.g. thread initializes before doing dangerous things.
212 :
213 : pid_t m_fgThreadID{ 0 }; ///< F.g. thread PID.
214 :
215 : pcf::IndiProperty m_fgThreadProp; ///< The property to hold the f.g. thread details.
216 :
217 : public:
218 : static void getCircBuffLengths( size_t &circBuffLength,
219 : double &circBuffSize,
220 : size_t &writeChunkLength,
221 : size_t maxCircBuffLength,
222 : double maxCircBuffSize,
223 : size_t maxWriteChunkLength,
224 : uint32_t width,
225 : uint32_t height,
226 : size_t typeSize );
227 :
228 : protected:
229 : /// Worker function to allocate the circular buffers.
230 : /** This takes place in the fg thread after connecting to the stream.
231 : *
232 : * \returns 0 on sucess.
233 : * \returns -1 on error.
234 : */
235 : int allocate_circbufs();
236 :
237 : /// Worker function to configure and allocate the xrif handles.
238 : /** This takes place in the fg thread after connecting to the stream.
239 : *
240 : * \returns 0 on sucess.
241 : * \returns -1 on error.
242 : */
243 : int allocate_xrif();
244 :
245 : /// Thread starter, called by fgThreadStart on thread construction. Calls fgThreadExec.
246 : static void fgThreadStart( streamWriter *s /**< [in] a pointer to an streamWriter instance (normally this) */ );
247 :
248 : /// Execute the frame grabber main loop.
249 : void fgThreadExec();
250 :
251 : ///@}
252 :
253 : /** \name Stream Writer Thread
254 : * This thread writes chunks of the circular buffer to disk.
255 : *
256 : * @{
257 : */
258 : int m_swThreadPrio{ 1 }; ///< Priority of the stream writer thread, should normally be > 0, and <= m_fgThreadPrio.
259 :
260 : std::string m_swCpuset; ///< The cpuset for the framegrabber thread. Ignored if empty (the default).
261 :
262 : sem_t m_swSemaphore; ///< Semaphore used to synchronize the fg thread and the sw thread.
263 :
264 : std::thread m_swThread; ///< A separate thread for the actual writing
265 :
266 : bool m_swThreadInit{ true }; ///< Synchronizer to ensure s.w. thread initializes before doing dangerous things.
267 :
268 : pid_t m_swThreadID{ 0 }; ///< S.w. thread pid.
269 :
270 : pcf::IndiProperty m_swThreadProp; ///< The property to hold the s.w. thread details.
271 :
272 : /// Thread starter, called by swThreadStart on thread construction. Calls swThreadExec.
273 : static void swThreadStart( streamWriter *s /**< [in] a pointer to an streamWriter instance (normally this) */ );
274 :
275 : /// Execute the stream writer main loop.
276 : void swThreadExec();
277 :
278 : /// Function called when semaphore is raised to do the encode and write.
279 : int doEncode();
280 : ///@}
281 :
282 : // INDI:
283 : protected:
284 : // declare our properties
285 : pcf::IndiProperty m_indiP_writing;
286 :
287 : pcf::IndiProperty m_indiP_xrifStats;
288 :
289 : public:
290 0 : INDI_NEWCALLBACK_DECL( streamWriter, m_indiP_writing );
291 :
292 : void updateINDI();
293 :
294 : /** \name Telemeter Interface
295 : *
296 : * @{
297 : */
298 : int checkRecordTimes();
299 :
300 : int recordTelem( const telem_saving_state * );
301 :
302 : int recordSavingState( bool force = false );
303 : int recordSavingStats( bool force = false );
304 :
305 : ///@}
306 : };
307 :
308 : // Set self pointer to null so app starts up uninitialized.
309 : streamWriter *streamWriter::m_selfWriter = nullptr;
310 :
311 24 : streamWriter::streamWriter() : MagAOXApp( MAGAOX_CURRENT_SHA1, MAGAOX_REPO_MODIFIED )
312 : {
313 8 : m_powerMgtEnabled = false;
314 :
315 8 : m_selfWriter = this;
316 :
317 8 : return;
318 0 : }
319 :
320 8 : streamWriter::~streamWriter() noexcept
321 : {
322 8 : if( m_xrif )
323 4 : xrif_delete( m_xrif );
324 :
325 8 : if( m_xrif_header )
326 4 : free( m_xrif_header );
327 :
328 8 : if( m_xrif_timing )
329 4 : xrif_delete( m_xrif_timing );
330 :
331 8 : if( m_xrif_timing_header )
332 4 : free( m_xrif_timing_header );
333 :
334 8 : return;
335 8 : }
336 :
337 0 : void streamWriter::setupConfig()
338 : {
339 0 : config.add( "writer.savePath",
340 : "",
341 : "writer.savePath",
342 : argType::Required,
343 : "writer",
344 : "savePath",
345 : false,
346 : "string",
347 : "The absolute path where images are saved. Will use MagAO-X default if not set." );
348 :
349 0 : config.add( "writer.maxCircBuffLength",
350 : "",
351 : "writer.maxCircBuffLength",
352 : argType::Required,
353 : "writer",
354 : "maxCircBuffLength",
355 : false,
356 : "size_t",
357 : "The maximum length in frames of the circular buffer. Should be an integer multiple of and larger than "
358 : "maxWriteChunkLength." );
359 :
360 0 : config.add( "writer.maxCircBuffSize",
361 : "",
362 : "writer.maxCircBuffSize",
363 : argType::Required,
364 : "writer",
365 : "maxCircBuffSize",
366 : false,
367 : "double",
368 : "The maximum size in MB of the circular buffer. Should be sized to hold at least 2 of the maximum "
369 : "frame size." );
370 :
371 0 : config.add(
372 : "writer.maxWriteChunkLength",
373 : "",
374 : "writer.maxWriteChunkLength",
375 : argType::Required,
376 : "writer",
377 : "maxWriteChunkLength",
378 : false,
379 : "size_t",
380 : "The maximum length in frames of the chunks to write to disk. Should be smaller than maxCircBuffLength." );
381 :
382 0 : config.add( "writer.maxChunkTime",
383 : "",
384 : "writer.maxChunkTime",
385 : argType::Required,
386 : "writer",
387 : "maxChunkTime",
388 : false,
389 : "float",
390 : "The max length in seconds of the chunks to write to disk. Default is 60 sec." );
391 :
392 0 : config.add( "writer.threadPrio",
393 : "",
394 : "writer.threadPrio",
395 : argType::Required,
396 : "writer",
397 : "threadPrio",
398 : false,
399 : "int",
400 : "The real-time priority of the stream writer thread." );
401 :
402 0 : config.add( "writer.cpuset",
403 : "",
404 : "writer.cpuset",
405 : argType::Required,
406 : "writer",
407 : "cpuset",
408 : false,
409 : "int",
410 : "The cpuset for the writer thread." );
411 :
412 0 : config.add( "writer.compress",
413 : "",
414 : "writer.compress",
415 : argType::Required,
416 : "writer",
417 : "compress",
418 : false,
419 : "bool",
420 : "Flag to set whether compression is used. Default true." );
421 :
422 0 : config.add( "writer.lz4accel",
423 : "",
424 : "writer.lz4accel",
425 : argType::Required,
426 : "writer",
427 : "lz4accel",
428 : false,
429 : "int",
430 : "The LZ4 acceleration parameter. Larger is faster, but lower compression." );
431 :
432 0 : config.add( "writer.outName",
433 : "",
434 : "writer.outName",
435 : argType::Required,
436 : "writer",
437 : "outName",
438 : false,
439 : "int",
440 : "The name to use for output files. Default is the shmimName." );
441 :
442 0 : config.add( "framegrabber.shmimName",
443 : "",
444 : "framegrabber.shmimName",
445 : argType::Required,
446 : "framegrabber",
447 : "shmimName",
448 : false,
449 : "int",
450 : "The name of the stream to monitor. From /tmp/shmimName.im.shm." );
451 :
452 0 : config.add( "framegrabber.semaphoreNumber",
453 : "",
454 : "framegrabber.semaphoreNumber",
455 : argType::Required,
456 : "framegrabber",
457 : "semaphoreNumber",
458 : false,
459 : "int",
460 : "The semaphore to wait on. Default is 7." );
461 :
462 0 : config.add( "framegrabber.semWait",
463 : "",
464 : "framegrabber.semWait",
465 : argType::Required,
466 : "framegrabber",
467 : "semWait",
468 : false,
469 : "int",
470 : "The time in nsec to wait on the semaphore. Max is 999999999. Default is 5e8 nsec." );
471 :
472 0 : config.add( "framegrabber.threadPrio",
473 : "",
474 : "framegrabber.threadPrio",
475 : argType::Required,
476 : "framegrabber",
477 : "threadPrio",
478 : false,
479 : "int",
480 : "The real-time priority of the framegrabber thread." );
481 :
482 0 : config.add( "framegrabber.cpuset",
483 : "",
484 : "framegrabber.cpuset",
485 : argType::Required,
486 : "framegrabber",
487 : "cpuset",
488 : false,
489 : "string",
490 : "The cpuset for the framegrabber thread." );
491 :
492 0 : telemeterT::setupConfig( config );
493 0 : }
494 :
495 0 : void streamWriter::loadConfig()
496 : {
497 :
498 0 : config( m_maxCircBuffLength, "writer.maxCircBuffLength" );
499 0 : config( m_maxCircBuffSize, "writer.maxCircBuffSize" );
500 0 : config( m_maxWriteChunkLength, "writer.maxWriteChunkLength" );
501 0 : config( m_maxChunkTime, "writer.maxChunkTime" );
502 0 : config( m_swThreadPrio, "writer.threadPrio" );
503 0 : config( m_swCpuset, "writer.cpuset" );
504 0 : config( m_compress, "writer.compress" );
505 0 : config( m_lz4accel, "writer.lz4accel" );
506 0 : if( m_lz4accel < XRIF_LZ4_ACCEL_MIN )
507 : {
508 0 : m_lz4accel = XRIF_LZ4_ACCEL_MIN;
509 : }
510 0 : if( m_lz4accel > XRIF_LZ4_ACCEL_MAX )
511 : {
512 0 : m_lz4accel = XRIF_LZ4_ACCEL_MAX;
513 : }
514 :
515 0 : config( m_shmimName, "framegrabber.shmimName" );
516 :
517 0 : m_outName = m_shmimName;
518 0 : config( m_outName, "writer.outName" );
519 :
520 0 : config( m_semaphoreNumber, "framegrabber.semaphoreNumber" );
521 0 : config( m_semWaitNSec, "framegrabber.semWait" );
522 :
523 0 : config( m_fgThreadPrio, "framegrabber.threadPrio" );
524 0 : config( m_fgCpuset, "framegrabber.cpuset" );
525 :
526 : // Set some defaults
527 : // Setup default saving path
528 0 : std::string tmpstr = mx::sys::getEnv( MAGAOX_env_rawimage );
529 0 : if( tmpstr == "" )
530 : {
531 0 : tmpstr = MAGAOX_rawimageRelPath;
532 : }
533 0 : m_rawimageDir = basePath() + "/" + tmpstr ;
534 :
535 0 : config( m_rawimageDir, "writer.savePath" );
536 :
537 0 : if( telemeterT::loadConfig( config ) < 0 )
538 : {
539 0 : log<text_log>( "Error during telemeter config", logPrio::LOG_CRITICAL );
540 0 : m_shutdown = true;
541 : }
542 0 : }
543 :
544 0 : int streamWriter::appStartup()
545 : {
546 : // Create save directory.
547 0 : errno = 0;
548 0 : if( mkdir( m_rawimageDir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH ) < 0 )
549 : {
550 0 : if( errno != EEXIST )
551 : {
552 0 : std::stringstream logss;
553 0 : logss << "Failed to create image directory (" << m_rawimageDir << "). Errno says: " << strerror( errno );
554 0 : log<software_critical>( { __FILE__, __LINE__, errno, 0, logss.str() } );
555 :
556 0 : return -1;
557 0 : }
558 : }
559 :
560 : // set up the INDI properties
561 0 : createStandardIndiToggleSw( m_indiP_writing, "writing" );
562 0 : registerIndiPropertyNew( m_indiP_writing, INDI_NEWCALLBACK( m_indiP_writing ) );
563 :
564 : // Register the stats INDI property
565 0 : REG_INDI_NEWPROP_NOCB( m_indiP_xrifStats, "xrif", pcf::IndiProperty::Number );
566 0 : m_indiP_xrifStats.setLabel( "xrif compression performance" );
567 :
568 0 : indi::addNumberElement<float>( m_indiP_xrifStats, "ratio", 0, 1.0, 0.0, "%0.2f", "Compression Ratio" );
569 :
570 0 : indi::addNumberElement<float>( m_indiP_xrifStats,
571 : "differenceMBsec",
572 : 0,
573 0 : std::numeric_limits<float>::max(),
574 0 : 0.0,
575 : "%0.2f",
576 : "Differencing Rate [MB/sec]" );
577 :
578 0 : indi::addNumberElement<float>( m_indiP_xrifStats,
579 : "reorderMBsec",
580 : 0,
581 0 : std::numeric_limits<float>::max(),
582 0 : 0.0,
583 : "%0.2f",
584 : "Reordering Rate [MB/sec]" );
585 :
586 0 : indi::addNumberElement<float>( m_indiP_xrifStats,
587 : "compressMBsec",
588 : 0,
589 0 : std::numeric_limits<float>::max(),
590 0 : 0.0,
591 : "%0.2f",
592 : "Compression Rate [MB/sec]" );
593 :
594 0 : indi::addNumberElement<float>( m_indiP_xrifStats,
595 : "encodeMBsec",
596 : 0,
597 0 : std::numeric_limits<float>::max(),
598 0 : 0.0,
599 : "%0.2f",
600 : "Total Encoding Rate [MB/sec]" );
601 :
602 0 : indi::addNumberElement<float>( m_indiP_xrifStats,
603 : "differenceFPS",
604 : 0,
605 0 : std::numeric_limits<float>::max(),
606 0 : 0.0,
607 : "%0.2f",
608 : "Differencing Rate [f.p.s.]" );
609 :
610 0 : indi::addNumberElement<float>( m_indiP_xrifStats,
611 : "reorderFPS",
612 : 0,
613 0 : std::numeric_limits<float>::max(),
614 0 : 0.0,
615 : "%0.2f",
616 : "Reordering Rate [f.p.s.]" );
617 :
618 0 : indi::addNumberElement<float>( m_indiP_xrifStats,
619 : "compressFPS",
620 : 0,
621 0 : std::numeric_limits<float>::max(),
622 0 : 0.0,
623 : "%0.2f",
624 : "Compression Rate [f.p.s.]" );
625 :
626 0 : indi::addNumberElement<float>( m_indiP_xrifStats,
627 : "encodeFPS",
628 : 0,
629 0 : std::numeric_limits<float>::max(),
630 0 : 0.0,
631 : "%0.2f",
632 : "Total Encoding Rate [f.p.s.]" );
633 :
634 : // Now set up the framegrabber and writer threads.
635 : // - need SIGSEGV and SIGBUS handling for ImageStreamIO restarts
636 : // - initialize the semaphore
637 : // - start the threads
638 :
639 0 : if( setSigSegvHandler() < 0 )
640 0 : return log<software_error, -1>( { __FILE__, __LINE__ } );
641 :
642 0 : if( sem_init( &m_swSemaphore, 0, 0 ) < 0 )
643 0 : return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "Initializing S.W. semaphore" } );
644 :
645 : // Check if we have a safe writeChunkLengthh
646 0 : if( m_maxCircBuffLength % m_maxWriteChunkLength != 0 )
647 : {
648 0 : return log<software_critical, -1>(
649 0 : { __FILE__, __LINE__, "Write chunk length is not a divisor of circular buffer length." } );
650 : }
651 :
652 0 : if( initialize_xrif() < 0 )
653 : {
654 0 : log<software_critical, -1>( { __FILE__, __LINE__ } );
655 : }
656 :
657 0 : if( threadStart( m_fgThread,
658 0 : m_fgThreadInit,
659 0 : m_fgThreadID,
660 0 : m_fgThreadProp,
661 : m_fgThreadPrio,
662 0 : m_fgCpuset,
663 : "framegrabber",
664 : this,
665 0 : fgThreadStart ) < 0 )
666 : {
667 0 : return log<software_critical, -1>( { __FILE__, __LINE__ } );
668 : }
669 :
670 0 : if( threadStart( m_swThread,
671 0 : m_swThreadInit,
672 0 : m_swThreadID,
673 0 : m_swThreadProp,
674 : m_swThreadPrio,
675 0 : m_swCpuset,
676 : "streamwriter",
677 : this,
678 0 : swThreadStart ) < 0 )
679 : {
680 0 : log<software_critical, -1>( { __FILE__, __LINE__ } );
681 : }
682 :
683 0 : if( telemeterT::appStartup() < 0 )
684 : {
685 0 : return log<software_error, -1>( { __FILE__, __LINE__ } );
686 : }
687 :
688 0 : return 0;
689 : }
690 :
691 0 : int streamWriter::appLogic()
692 : {
693 :
694 : // first do a join check to see if other threads have exited.
695 : // these will throw if the threads are really gone
696 : try
697 : {
698 0 : if( pthread_tryjoin_np( m_fgThread.native_handle(), 0 ) == 0 )
699 : {
700 0 : log<software_error>( { __FILE__, __LINE__, "framegrabber thread has exited" } );
701 0 : return -1;
702 : }
703 : }
704 0 : catch( ... )
705 : {
706 0 : log<software_error>( { __FILE__, __LINE__, "framegrabber thread has exited" } );
707 0 : return -1;
708 0 : }
709 :
710 : try
711 : {
712 0 : if( pthread_tryjoin_np( m_swThread.native_handle(), 0 ) == 0 )
713 : {
714 0 : log<software_error>( { __FILE__, __LINE__, "stream thread has exited" } );
715 0 : return -1;
716 : }
717 : }
718 0 : catch( ... )
719 : {
720 0 : log<software_error>( { __FILE__, __LINE__, "streamwriter thread has exited" } );
721 0 : return -1;
722 0 : }
723 :
724 0 : switch( m_writing )
725 : {
726 0 : case NOT_WRITING:
727 0 : state( stateCodes::READY );
728 0 : break;
729 0 : default:
730 0 : state( stateCodes::OPERATING );
731 : }
732 :
733 0 : if( state() == stateCodes::OPERATING )
734 : {
735 0 : if( telemeterT::appLogic() < 0 )
736 : {
737 0 : log<software_error>( { __FILE__, __LINE__ } );
738 0 : return 0;
739 : }
740 : }
741 :
742 0 : updateINDI();
743 :
744 0 : return 0;
745 : }
746 :
747 0 : int streamWriter::appShutdown()
748 : {
749 0 : m_writing = NOT_WRITING;
750 0 : updateINDI();
751 :
752 : try
753 : {
754 0 : if( m_fgThread.joinable() )
755 : {
756 0 : m_fgThread.join();
757 : }
758 : }
759 0 : catch( ... )
760 : {
761 0 : }
762 :
763 : try
764 : {
765 0 : if( m_swThread.joinable() )
766 : {
767 0 : m_swThread.join();
768 : }
769 : }
770 0 : catch( ... )
771 : {
772 0 : }
773 :
774 0 : if( m_xrif )
775 : {
776 0 : xrif_delete( m_xrif );
777 0 : m_xrif = nullptr;
778 : }
779 :
780 0 : if( m_xrif_timing )
781 : {
782 0 : xrif_delete( m_xrif_timing );
783 0 : m_xrif_timing = nullptr;
784 : }
785 :
786 0 : telemeterT::appShutdown();
787 :
788 0 : return 0;
789 : }
790 :
791 4 : int streamWriter::initialize_xrif()
792 : {
793 4 : xrif_error_t rv = xrif_new( &m_xrif );
794 4 : if( rv != XRIF_NOERROR )
795 : {
796 0 : return log<software_critical, -1>(
797 0 : { __FILE__, __LINE__, 0, rv, "xrif handle allocation or initialization error." } );
798 : }
799 :
800 4 : if( m_compress )
801 : {
802 4 : rv = xrif_configure( m_xrif, XRIF_DIFFERENCE_PREVIOUS, XRIF_REORDER_BYTEPACK, XRIF_COMPRESS_LZ4 );
803 4 : if( rv != XRIF_NOERROR )
804 : {
805 0 : return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
806 : }
807 : }
808 : else
809 : {
810 0 : std::cerr << "not compressing . . . \n";
811 0 : rv = xrif_configure( m_xrif, XRIF_DIFFERENCE_NONE, XRIF_REORDER_NONE, XRIF_COMPRESS_NONE );
812 0 : if( rv != XRIF_NOERROR )
813 : {
814 0 : return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
815 : }
816 : }
817 :
818 4 : errno = 0;
819 :
820 4 : m_xrif_header = reinterpret_cast<char *>( malloc( XRIF_HEADER_SIZE * sizeof( char ) ) );
821 4 : if( m_xrif_header == NULL )
822 : {
823 0 : return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "xrif header allocation failed." } );
824 : }
825 :
826 4 : rv = xrif_new( &m_xrif_timing );
827 4 : if( rv != XRIF_NOERROR )
828 : {
829 0 : return log<software_critical, -1>(
830 0 : { __FILE__, __LINE__, 0, rv, "xrif handle allocation or initialization error." } );
831 : }
832 :
833 : // m_xrif_timing->reorder_method = XRIF_REORDER_NONE;
834 4 : rv = xrif_configure( m_xrif_timing, XRIF_DIFFERENCE_NONE, XRIF_REORDER_NONE, XRIF_COMPRESS_NONE );
835 4 : if( rv != XRIF_NOERROR )
836 : {
837 0 : return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
838 : }
839 :
840 4 : errno = 0;
841 :
842 4 : m_xrif_timing_header = reinterpret_cast<char *>( malloc( XRIF_HEADER_SIZE * sizeof( char ) ) );
843 4 : if( m_xrif_timing_header == NULL )
844 : {
845 0 : return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "xrif header allocation failed." } );
846 : }
847 :
848 4 : return 0;
849 : }
850 :
851 0 : int streamWriter::setSigSegvHandler()
852 : {
853 : struct sigaction act;
854 : sigset_t set;
855 :
856 0 : act.sa_sigaction = &streamWriter::_handlerSigSegv;
857 0 : act.sa_flags = SA_SIGINFO;
858 0 : sigemptyset( &set );
859 0 : act.sa_mask = set;
860 :
861 0 : errno = 0;
862 0 : if( sigaction( SIGSEGV, &act, 0 ) < 0 )
863 : {
864 0 : std::string logss = "Setting handler for SIGSEGV failed. Errno says: ";
865 0 : logss += strerror( errno );
866 :
867 0 : log<software_error>( { __FILE__, __LINE__, errno, 0, logss } );
868 :
869 0 : return -1;
870 0 : }
871 :
872 0 : errno = 0;
873 0 : if( sigaction( SIGBUS, &act, 0 ) < 0 )
874 : {
875 0 : std::string logss = "Setting handler for SIGBUS failed. Errno says: ";
876 0 : logss += strerror( errno );
877 :
878 0 : log<software_error>( { __FILE__, __LINE__, errno, 0, logss } );
879 :
880 0 : return -1;
881 0 : }
882 :
883 0 : log<text_log>( "Installed SIGSEGV/SIGBUS signal handler.", logPrio::LOG_DEBUG );
884 :
885 0 : return 0;
886 : }
887 :
888 0 : void streamWriter::_handlerSigSegv( int signum, siginfo_t *siginf, void *ucont )
889 : {
890 0 : m_selfWriter->handlerSigSegv( signum, siginf, ucont );
891 0 : }
892 :
893 0 : void streamWriter::handlerSigSegv( int signum, siginfo_t *siginf, void *ucont )
894 : {
895 : static_cast<void>( signum );
896 : static_cast<void>( siginf );
897 : static_cast<void>( ucont );
898 :
899 0 : m_restart = true;
900 :
901 0 : return;
902 : }
903 :
904 12 : void streamWriter::getCircBuffLengths( size_t &circBuffLength,
905 : double &circBuffSize,
906 : size_t &writeChunkLength,
907 : size_t maxCircBuffLength,
908 : double maxCircBuffSize,
909 : size_t maxWriteChunkLength,
910 : uint32_t width,
911 : uint32_t height,
912 : size_t typeSize )
913 : {
914 : static constexpr double MB = 1048576.0;
915 :
916 12 : size_t isz = width * height * typeSize * maxCircBuffLength;
917 :
918 12 : if( isz <= maxCircBuffSize * MB )
919 : {
920 6 : circBuffLength = maxCircBuffLength;
921 6 : circBuffSize = isz / MB;
922 6 : writeChunkLength = maxWriteChunkLength;
923 :
924 6 : return;
925 : }
926 :
927 6 : circBuffLength = maxCircBuffSize * MB / ( width * height * typeSize );
928 :
929 6 : if( circBuffLength % 2 == 1 )
930 : {
931 1 : --circBuffLength;
932 : }
933 :
934 6 : circBuffSize = ( width * height * typeSize * circBuffLength ) / MB;
935 :
936 6 : writeChunkLength = ( 1.0 * maxWriteChunkLength / maxCircBuffLength ) * circBuffLength;
937 :
938 6 : if( circBuffLength == 0 )
939 : {
940 2 : return;
941 : }
942 :
943 4 : if( writeChunkLength == 0 )
944 : {
945 0 : writeChunkLength = 1;
946 : }
947 :
948 4 : while( circBuffLength % writeChunkLength != 0 )
949 : {
950 0 : --writeChunkLength;
951 : }
952 :
953 4 : return;
954 : }
955 :
956 4 : int streamWriter::allocate_circbufs()
957 : {
958 :
959 4 : getCircBuffLengths( m_circBuffLength,
960 4 : m_circBuffSize,
961 4 : m_writeChunkLength,
962 : m_maxCircBuffLength,
963 : m_maxCircBuffSize,
964 : m_maxWriteChunkLength,
965 4 : m_width,
966 4 : m_height,
967 4 : m_typeSize );
968 :
969 4 : if( m_circBuffLength < 2 )
970 : {
971 0 : return log<software_critical, -1>( { __FILE__, __LINE__, "frame size too large to fit in maxCircBuffSize" } );
972 : }
973 :
974 4 : if( m_writeChunkLength >= m_circBuffLength )
975 : {
976 0 : return log<software_critical, -1>(
977 0 : { __FILE__, __LINE__, "writeChunkLength is not smaller than circBuffLength" } );
978 : }
979 :
980 4 : if( m_circBuffLength % m_writeChunkLength != 0 )
981 : {
982 0 : return log<software_critical, -1>(
983 0 : { __FILE__, __LINE__, "writeChunkLength is not an integer factor of circBuffLength" } );
984 : }
985 :
986 4 : std::string msg = "Set circ buff length: " + std::to_string( m_circBuffLength ) + " frames (";
987 4 : msg += std::to_string( m_circBuffSize ) + " MB). Write chunk length: " + std::to_string( m_writeChunkLength );
988 4 : msg += " frames.";
989 :
990 4 : log<text_log>( msg, logPrio::LOG_NOTICE );
991 :
992 4 : if( m_rawImageCircBuff )
993 : {
994 0 : free( m_rawImageCircBuff );
995 : }
996 :
997 4 : errno = 0;
998 4 : m_rawImageCircBuff = reinterpret_cast<char *>( malloc( m_width * m_height * m_typeSize * m_circBuffLength ) );
999 :
1000 4 : if( m_rawImageCircBuff == NULL )
1001 : {
1002 0 : return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "buffer allocation failure" } );
1003 : }
1004 :
1005 4 : if( m_timingCircBuff )
1006 : {
1007 0 : free( m_timingCircBuff );
1008 : }
1009 :
1010 4 : errno = 0;
1011 4 : m_timingCircBuff = reinterpret_cast<uint64_t *>( malloc( 5 * sizeof( uint64_t ) * m_circBuffLength ) );
1012 4 : if( m_timingCircBuff == NULL )
1013 : {
1014 0 : return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "buffer allocation failure" } );
1015 : }
1016 :
1017 4 : return 0;
1018 4 : }
1019 :
1020 4 : int streamWriter::allocate_xrif()
1021 : {
1022 : // Set up the image data xrif handle
1023 : xrif_error_t rv;
1024 :
1025 4 : if( m_compress )
1026 : {
1027 4 : rv = xrif_configure( m_xrif, XRIF_DIFFERENCE_PREVIOUS, XRIF_REORDER_BYTEPACK, XRIF_COMPRESS_LZ4 );
1028 4 : if( rv != XRIF_NOERROR )
1029 : {
1030 0 : return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
1031 : }
1032 : }
1033 : else
1034 : {
1035 0 : std::cerr << "not compressing . . . \n";
1036 0 : rv = xrif_configure( m_xrif, XRIF_DIFFERENCE_NONE, XRIF_REORDER_NONE, XRIF_COMPRESS_NONE );
1037 0 : if( rv != XRIF_NOERROR )
1038 : {
1039 0 : return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
1040 : }
1041 : }
1042 :
1043 4 : rv = xrif_set_size( m_xrif, m_width, m_height, 1, m_writeChunkLength, m_dataType );
1044 4 : if( rv != XRIF_NOERROR )
1045 : {
1046 0 : return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_set_size error." } );
1047 : }
1048 :
1049 4 : rv = xrif_allocate_raw( m_xrif );
1050 4 : if( rv != XRIF_NOERROR )
1051 : {
1052 0 : return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_raw error." } );
1053 : }
1054 :
1055 4 : rv = xrif_allocate_reordered( m_xrif );
1056 4 : if( rv != XRIF_NOERROR )
1057 : {
1058 0 : return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_reordered error." } );
1059 : }
1060 :
1061 : // Set up the timing data xrif handle
1062 4 : rv = xrif_configure( m_xrif_timing, XRIF_DIFFERENCE_NONE, XRIF_REORDER_NONE, XRIF_COMPRESS_NONE );
1063 4 : if( rv != XRIF_NOERROR )
1064 : {
1065 0 : return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
1066 : }
1067 :
1068 4 : rv = xrif_set_size( m_xrif_timing, 5, 1, 1, m_writeChunkLength, XRIF_TYPECODE_UINT64 );
1069 4 : if( rv != XRIF_NOERROR )
1070 : {
1071 0 : return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_set_size error." } );
1072 : }
1073 :
1074 4 : rv = xrif_allocate_raw( m_xrif_timing );
1075 4 : if( rv != XRIF_NOERROR )
1076 : {
1077 0 : return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_raw error." } );
1078 : }
1079 :
1080 4 : rv = xrif_allocate_reordered( m_xrif_timing );
1081 4 : if( rv != XRIF_NOERROR )
1082 : {
1083 0 : return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_reordered error." } );
1084 : }
1085 :
1086 4 : return 0;
1087 : }
1088 :
1089 0 : void streamWriter::fgThreadStart( streamWriter *o )
1090 : {
1091 0 : o->fgThreadExec();
1092 0 : }
1093 :
1094 0 : void streamWriter::fgThreadExec()
1095 : {
1096 0 : m_fgThreadID = syscall( SYS_gettid );
1097 :
1098 : // Wait fpr the thread starter to finish initializing this thread.
1099 0 : while( m_fgThreadInit == true && m_shutdown == 0 )
1100 : {
1101 0 : sleep( 1 );
1102 : }
1103 :
1104 : timespec missing_ts;
1105 :
1106 : IMAGE image;
1107 0 : ino_t inode = 0; // The inode of the image stream file
1108 :
1109 0 : bool opened = false;
1110 :
1111 0 : while( m_shutdown == 0 )
1112 : {
1113 : /* Initialize ImageStreamIO
1114 : */
1115 0 : opened = false;
1116 0 : m_restart = false; // Set this up front, since we're about to restart.
1117 :
1118 0 : sem_t *sem{ nullptr }; ///< The semaphore to monitor for new image data
1119 :
1120 0 : int logged = 0;
1121 0 : while( !opened && !m_shutdown && !m_restart )
1122 : {
1123 : // b/c ImageStreamIO prints every single time, and latest version don't support stopping it yet,
1124 : // and that isn't thread-safe-able anyway
1125 : // we do our own checks. This is the same code in ImageStreamIO_openIm...
1126 : int SM_fd;
1127 : char SM_fname[200];
1128 0 : ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), m_shmimName.c_str() );
1129 0 : SM_fd = open( SM_fname, O_RDWR );
1130 0 : if( SM_fd == -1 )
1131 : {
1132 0 : if( !logged )
1133 : {
1134 0 : log<text_log>( "ImageStream " + m_shmimName + " not found (yet). Retrying . . .",
1135 : logPrio::LOG_NOTICE );
1136 : }
1137 0 : logged = 1;
1138 0 : sleep( 1 ); // be patient
1139 0 : continue;
1140 : }
1141 :
1142 : // Found and opened, close it and then use ImageStreamIO
1143 0 : logged = 0;
1144 0 : close( SM_fd );
1145 :
1146 0 : if( ImageStreamIO_openIm( &image, m_shmimName.c_str() ) == 0 )
1147 : {
1148 0 : if( image.md[0].sem < SEMAPHORE_MAXVAL )
1149 : {
1150 0 : ImageStreamIO_closeIm( &image );
1151 0 : mx::sys::sleep( 1 ); // We just need to wait for the server process to finish startup.
1152 : }
1153 : else
1154 : {
1155 0 : opened = true;
1156 :
1157 : char SM_fname[200];
1158 0 : ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), m_shmimName.c_str() );
1159 :
1160 : struct stat buffer;
1161 0 : int rv = stat( SM_fname, &buffer );
1162 :
1163 0 : if( rv != 0 )
1164 : {
1165 0 : log<software_critical>( { __FILE__,
1166 : __LINE__,
1167 0 : errno,
1168 0 : "Could not get inode for " + m_shmimName +
1169 : ". Source process will need to be restarted." } );
1170 0 : ImageStreamIO_closeIm( &image );
1171 0 : return;
1172 : }
1173 0 : inode = buffer.st_ino;
1174 : }
1175 : }
1176 : else
1177 : {
1178 0 : mx::sys::sleep( 1 ); // be patient
1179 : }
1180 : }
1181 :
1182 0 : if( m_restart )
1183 0 : continue; // this is kinda dumb. we just go around on restart, so why test in the while loop at all?
1184 :
1185 0 : if( m_shutdown || !opened )
1186 : {
1187 0 : if( !opened )
1188 0 : return;
1189 :
1190 0 : ImageStreamIO_closeIm( &image );
1191 0 : return;
1192 : }
1193 :
1194 : // now get a good semaphore
1195 0 : m_semaphoreNumber =
1196 0 : ImageStreamIO_getsemwaitindex( &image, m_semaphoreNumber ); // ask for semaphore we had before
1197 :
1198 0 : if( m_semaphoreNumber < 0 )
1199 : {
1200 0 : log<software_critical>(
1201 : { __FILE__,
1202 : __LINE__,
1203 0 : "No valid semaphore found for " + m_shmimName + ". Source process will need to be restarted." } );
1204 0 : return;
1205 : }
1206 :
1207 0 : log<software_info>( { __FILE__,
1208 : __LINE__,
1209 0 : "got semaphore index " + std::to_string( m_semaphoreNumber ) + " for " + m_shmimName } );
1210 :
1211 0 : ImageStreamIO_semflush( &image, m_semaphoreNumber );
1212 :
1213 0 : sem = image.semptr[m_semaphoreNumber]; ///< The semaphore to monitor for new image data
1214 :
1215 0 : m_dataType = image.md[0].datatype;
1216 0 : m_typeSize = ImageStreamIO_typesize( m_dataType );
1217 0 : m_width = image.md[0].size[0];
1218 0 : m_height = image.md[0].size[1];
1219 : size_t length;
1220 0 : if( image.md[0].naxis == 3 )
1221 : {
1222 0 : length = image.md[0].size[2];
1223 : }
1224 : else
1225 : {
1226 0 : length = 1;
1227 : }
1228 : std::cerr << "connected"
1229 0 : << " " << m_width << "x" << m_height << "x" << (int)m_dataType << " (" << m_typeSize << ")"
1230 0 : << std::endl;
1231 :
1232 : // Now allocate the circBuffs
1233 0 : if( allocate_circbufs() < 0 )
1234 : {
1235 0 : return; // will cause shutdown!
1236 : }
1237 :
1238 : // And allocate the xrifs
1239 0 : if( allocate_xrif() < 0 )
1240 : {
1241 0 : return; // Will cause shutdown!
1242 : }
1243 :
1244 : uint8_t atype;
1245 : size_t snx, sny, snz;
1246 :
1247 : uint64_t curr_image; // The current cnt1 index
1248 0 : m_currImage = 0;
1249 0 : m_currChunkStart = 0;
1250 0 : m_nextChunkStart = 0;
1251 :
1252 : // Initialized curr_image ...
1253 0 : if( image.md[0].naxis > 2 )
1254 : {
1255 0 : curr_image = image.md[0].cnt1;
1256 : }
1257 : else
1258 : {
1259 0 : curr_image = 0;
1260 : }
1261 :
1262 : uint64_t last_cnt0; // = ((uint64_t)-1);
1263 :
1264 : // so we can initialize last_cnt0 to avoid frame skip on startup
1265 0 : if( image.cntarray )
1266 : {
1267 0 : last_cnt0 = image.cntarray[curr_image];
1268 : }
1269 : else
1270 : {
1271 0 : last_cnt0 = image.md[0].cnt0;
1272 : }
1273 :
1274 0 : int cnt0flag = 0;
1275 :
1276 0 : bool restartWriting = false; // flag to prevent logging on a logging restart
1277 :
1278 : // This is the main image grabbing loop.
1279 0 : while( !m_shutdown && !m_restart )
1280 : {
1281 : timespec ts;
1282 0 : XWC_SEM_WAIT_TS_RETVOID( ts, m_semWaitSec, m_semWaitNSec );
1283 :
1284 0 : if( sem_timedwait( sem, &ts ) == 0 )
1285 : {
1286 0 : if( image.md[0].naxis > 2 )
1287 : {
1288 0 : curr_image = image.md[0].cnt1;
1289 : }
1290 : else
1291 : {
1292 0 : curr_image = 0;
1293 : }
1294 :
1295 0 : atype = image.md[0].datatype;
1296 0 : snx = image.md[0].size[0];
1297 0 : sny = image.md[0].size[1];
1298 0 : if( image.md[0].naxis == 3 )
1299 : {
1300 0 : snz = image.md[0].size[2];
1301 : }
1302 : else
1303 : {
1304 0 : snz = 1;
1305 : }
1306 :
1307 0 : if( atype != m_dataType || snx != m_width || sny != m_height || snz != length )
1308 : {
1309 : break; // exit the nearest while loop and get the new image setup.
1310 : }
1311 :
1312 0 : if( m_shutdown || m_restart )
1313 : {
1314 : break; // Check for exit signals
1315 : }
1316 :
1317 : uint64_t new_cnt0;
1318 0 : if( image.cntarray )
1319 : {
1320 0 : new_cnt0 = image.cntarray[curr_image];
1321 : }
1322 : else
1323 : {
1324 0 : new_cnt0 = image.md[0].cnt0;
1325 : }
1326 :
1327 : #ifdef SW_DEBUG
1328 : std::cerr << "new_cnt0: " << new_cnt0 << "\n";
1329 : #endif
1330 :
1331 : ///\todo cleanup skip frame handling.
1332 0 : if( new_cnt0 == last_cnt0 ) //<- this probably isn't useful really
1333 : {
1334 0 : log<text_log>( "semaphore raised but cnt0 has not changed -- we're probably getting behind",
1335 : logPrio::LOG_WARNING );
1336 0 : ++cnt0flag;
1337 0 : if( cnt0flag > 10 )
1338 : {
1339 0 : m_restart = true; // if we get here 10 times then something else is wrong.
1340 : }
1341 0 : continue;
1342 : }
1343 :
1344 0 : if( new_cnt0 - last_cnt0 > 1 ) //<- this is what we want to check.
1345 : {
1346 0 : log<text_log>( "cnt0 changed by more than 1. Frame skipped.", logPrio::LOG_WARNING );
1347 : }
1348 :
1349 0 : cnt0flag = 0;
1350 :
1351 0 : last_cnt0 = new_cnt0;
1352 :
1353 0 : char *curr_dest = m_rawImageCircBuff + m_currImage * m_width * m_height * m_typeSize;
1354 0 : char *curr_src =
1355 0 : reinterpret_cast<char *>( image.array.raw ) + curr_image * m_width * m_height * m_typeSize;
1356 :
1357 0 : memcpy( curr_dest, curr_src, m_width * m_height * m_typeSize );
1358 :
1359 0 : uint64_t *curr_timing = m_timingCircBuff + 5 * m_currImage;
1360 :
1361 0 : if( image.cntarray )
1362 : {
1363 0 : curr_timing[0] = image.cntarray[curr_image];
1364 0 : curr_timing[1] = image.atimearray[curr_image].tv_sec;
1365 0 : curr_timing[2] = image.atimearray[curr_image].tv_nsec;
1366 0 : curr_timing[3] = image.writetimearray[curr_image].tv_sec;
1367 0 : curr_timing[4] = image.writetimearray[curr_image].tv_nsec;
1368 : }
1369 : else
1370 : {
1371 0 : curr_timing[0] = image.md[0].cnt0;
1372 0 : curr_timing[1] = image.md[0].atime.tv_sec;
1373 0 : curr_timing[2] = image.md[0].atime.tv_nsec;
1374 0 : curr_timing[3] = image.md[0].writetime.tv_sec;
1375 0 : curr_timing[4] = image.md[0].writetime.tv_nsec;
1376 : }
1377 :
1378 : // Check if we need to time-stamp ourselves -- for old cacao streams
1379 0 : if( curr_timing[1] == 0 )
1380 : {
1381 :
1382 0 : if( clock_gettime( CLOCK_REALTIME, &missing_ts ) < 0 )
1383 : {
1384 0 : log<software_critical>( { __FILE__, __LINE__, errno, 0, "clock_gettime" } );
1385 0 : return;
1386 : }
1387 :
1388 0 : curr_timing[1] = missing_ts.tv_sec;
1389 0 : curr_timing[2] = missing_ts.tv_nsec;
1390 : }
1391 :
1392 : // just set w-time to a-time if it's missing
1393 0 : if( curr_timing[3] == 0 )
1394 : {
1395 0 : curr_timing[3] = curr_timing[1];
1396 0 : curr_timing[4] = curr_timing[2];
1397 : }
1398 :
1399 0 : m_currImageTime = 1.0 * curr_timing[3] + ( 1.0 * curr_timing[4] ) / 1e9;
1400 :
1401 0 : if( m_shutdown && m_writing == WRITING )
1402 : {
1403 0 : m_writing = STOP_WRITING;
1404 : }
1405 :
1406 0 : switch( m_writing )
1407 : {
1408 0 : case START_WRITING:
1409 :
1410 0 : m_currChunkStart = m_currImage;
1411 0 : m_nextChunkStart = ( m_currImage / m_writeChunkLength ) * m_writeChunkLength;
1412 0 : m_currChunkStartTime = m_currImageTime;
1413 :
1414 0 : if( !restartWriting ) // We only log if this is really a start
1415 : {
1416 0 : log<saving_start>( { 1, new_cnt0 } );
1417 : }
1418 : else // on a restart after a timeout we don't log
1419 : {
1420 0 : restartWriting = false;
1421 : }
1422 :
1423 0 : m_writing = WRITING;
1424 :
1425 : // fall through
1426 0 : case WRITING:
1427 0 : if( m_currImage - m_nextChunkStart == m_writeChunkLength - 1 )
1428 : {
1429 0 : m_currSaveStart = m_currChunkStart;
1430 0 : m_currSaveStop = m_nextChunkStart + m_writeChunkLength;
1431 0 : m_currSaveStopFrameNo = new_cnt0;
1432 :
1433 : #ifdef SW_DEBUG
1434 : std::cerr << __FILE__ << " " << __LINE__ << " WRITING " << m_currImage << " "
1435 : << m_nextChunkStart << " "
1436 : << ( m_currImage - m_nextChunkStart == m_writeChunkLength - 1 ) << " "
1437 : << ( m_currImageTime - m_currChunkStartTime > m_maxChunkTime ) << " " << new_cnt0
1438 : << "\n";
1439 : #endif
1440 :
1441 : // Now tell the writer to get going
1442 0 : if( sem_post( &m_swSemaphore ) < 0 )
1443 : {
1444 0 : log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1445 0 : return;
1446 : }
1447 :
1448 0 : m_nextChunkStart = ( ( m_currImage + 1 ) / m_writeChunkLength ) * m_writeChunkLength;
1449 0 : if( m_nextChunkStart >= m_circBuffLength )
1450 : {
1451 0 : m_nextChunkStart = 0;
1452 : }
1453 :
1454 0 : m_currChunkStart = m_nextChunkStart;
1455 0 : m_currChunkStartTime = m_currImageTime;
1456 : }
1457 0 : else if( m_currImageTime - m_currChunkStartTime > m_maxChunkTime )
1458 : {
1459 0 : m_currSaveStart = m_currChunkStart;
1460 0 : m_currSaveStop = m_currImage + 1;
1461 0 : m_currSaveStopFrameNo = new_cnt0;
1462 :
1463 : #ifdef SW_DEBUG
1464 : std::cerr << __FILE__ << " " << __LINE__ << " IMAGE TIME WRITING " << m_currImage << " "
1465 : << m_nextChunkStart << " "
1466 : << ( m_currImage - m_nextChunkStart == m_writeChunkLength - 1 ) << " "
1467 : << ( m_currImageTime - m_currChunkStartTime > m_maxChunkTime ) << " " << new_cnt0
1468 : << "\n";
1469 : #endif
1470 :
1471 : // Now tell the writer to get going
1472 0 : if( sem_post( &m_swSemaphore ) < 0 )
1473 : {
1474 0 : log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1475 0 : return;
1476 : }
1477 :
1478 0 : m_writing = START_WRITING;
1479 0 : restartWriting = true;
1480 : }
1481 0 : break;
1482 :
1483 0 : case STOP_WRITING:
1484 0 : m_currSaveStart = m_currChunkStart;
1485 0 : m_currSaveStop = m_currImage + 1;
1486 0 : m_currSaveStopFrameNo = new_cnt0;
1487 :
1488 : #ifdef SW_DEBUG
1489 : std::cerr << __FILE__ << " " << __LINE__ << " STOP_WRITING\n";
1490 : #endif
1491 :
1492 : // Now tell the writer to get going
1493 0 : if( sem_post( &m_swSemaphore ) < 0 )
1494 : {
1495 0 : log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1496 0 : return;
1497 : }
1498 0 : restartWriting = false;
1499 0 : break;
1500 :
1501 0 : default:
1502 0 : break;
1503 : }
1504 :
1505 0 : ++m_currImage;
1506 0 : if( m_currImage >= m_circBuffLength )
1507 : {
1508 0 : m_currImage = 0;
1509 : }
1510 : }
1511 : else
1512 : {
1513 : // If semaphore times-out or errors, we first cleanup any writing that needs to be done
1514 : // we can also get here if a signal interrupts the sem wait which is triggered by INDI callbacks
1515 0 : switch( m_writing )
1516 : {
1517 0 : case WRITING:
1518 : // Here, if there is at least 1 image, we check for delta-time > m_maxChunkTime
1519 : // then write
1520 0 : if( ( m_currImage - m_nextChunkStart > 0 ) &&
1521 0 : ( mx::sys::get_curr_time() - m_currChunkStartTime > m_maxChunkTime ) )
1522 : {
1523 0 : m_currSaveStart = m_currChunkStart;
1524 0 : m_currSaveStop = m_currImage;
1525 0 : m_currSaveStopFrameNo = last_cnt0;
1526 :
1527 : #ifdef SW_DEBUG
1528 : std::cerr << __FILE__ << " " << __LINE__ << " TIMEOUT WRITING " << " " << m_currImage << " "
1529 : << m_nextChunkStart << " " << ( m_currImage - m_nextChunkStart ) << " " << last_cnt0
1530 : << "\n";
1531 : #endif
1532 :
1533 : // Now tell the writer to get going
1534 0 : if( sem_post( &m_swSemaphore ) < 0 )
1535 : {
1536 0 : log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1537 0 : return;
1538 : }
1539 :
1540 0 : m_writing = START_WRITING;
1541 0 : restartWriting = true;
1542 : }
1543 0 : break;
1544 0 : case STOP_WRITING:
1545 : // If we timed-out while STOP_WRITING is set, we trigger a write.
1546 0 : m_currSaveStart = m_currChunkStart;
1547 0 : m_currSaveStop = m_currImage;
1548 0 : m_currSaveStopFrameNo = last_cnt0;
1549 :
1550 : #ifdef SW_DEBUG
1551 : std::cerr << __FILE__ << " " << __LINE__ << " TIMEOUT STOP_WRITING\n";
1552 : #endif
1553 :
1554 : // Now tell the writer to get going
1555 0 : if( sem_post( &m_swSemaphore ) < 0 )
1556 : {
1557 0 : log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1558 0 : return;
1559 : }
1560 0 : restartWriting = false;
1561 0 : break;
1562 0 : default:
1563 0 : break;
1564 : }
1565 :
1566 0 : if( image.md[0].sem <= 0 )
1567 : {
1568 0 : break; // Indicates that the server has cleaned up.
1569 : }
1570 :
1571 : // Check for why we timed out
1572 0 : if( errno == EINTR )
1573 : {
1574 0 : break; // This will indicate time to shutdown, loop will exit normally flags set.
1575 : }
1576 :
1577 : // ETIMEDOUT just means we should wait more.
1578 : // Otherwise, report an error.
1579 0 : if( errno != ETIMEDOUT )
1580 : {
1581 0 : log<software_error>( { __FILE__, __LINE__, errno, "sem_timedwait" } );
1582 0 : break;
1583 : }
1584 :
1585 : // Check if the file has disappeared.
1586 : int SM_fd;
1587 : char SM_fname[200];
1588 0 : ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), m_shmimName.c_str() );
1589 0 : SM_fd = open( SM_fname, O_RDWR );
1590 0 : if( SM_fd == -1 )
1591 : {
1592 0 : m_restart = true;
1593 : }
1594 0 : close( SM_fd );
1595 :
1596 : // Check if the inode changed
1597 : struct stat buffer;
1598 0 : int rv = stat( SM_fname, &buffer );
1599 0 : if( rv != 0 )
1600 : {
1601 0 : m_restart = true;
1602 : }
1603 :
1604 0 : if( buffer.st_ino != inode )
1605 : {
1606 : #ifdef SW_DEBUG
1607 : std::cerr << "Restarting due to inode . . . \n";
1608 : #endif
1609 0 : m_restart = true;
1610 : }
1611 : }
1612 : }
1613 :
1614 : ///\todo might still be writing here, so must check
1615 : // If semaphore times-out or errors, we first cleanup any writing that needs to be done
1616 0 : if( m_writing == WRITING || m_writing == STOP_WRITING )
1617 : {
1618 : // Here, if there is at least 1 image, then write
1619 0 : if( ( m_currImage - m_nextChunkStart > 0 ) )
1620 : {
1621 0 : m_currSaveStart = m_currChunkStart;
1622 0 : m_currSaveStop = m_currImage;
1623 0 : m_currSaveStopFrameNo = last_cnt0;
1624 :
1625 0 : m_writing = STOP_WRITING;
1626 :
1627 0 : std::cerr << __FILE__ << " " << __LINE__ << " WRITING ON RESTART " << last_cnt0 << "\n";
1628 : // Now tell the writer to get going
1629 0 : if( sem_post( &m_swSemaphore ) < 0 )
1630 : {
1631 0 : log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1632 0 : return;
1633 : }
1634 : }
1635 : else
1636 : {
1637 0 : m_writing = NOT_WRITING;
1638 : }
1639 :
1640 0 : while( m_writing != NOT_WRITING )
1641 : {
1642 0 : std::cerr << __FILE__ << " " << __LINE__ << " WAITING TO FINISH WRITING " << last_cnt0 << "\n";
1643 0 : sleep( 1 );
1644 : }
1645 : }
1646 :
1647 0 : if( m_rawImageCircBuff )
1648 : {
1649 0 : free( m_rawImageCircBuff );
1650 0 : m_rawImageCircBuff = 0;
1651 : }
1652 :
1653 0 : if( m_timingCircBuff )
1654 : {
1655 0 : free( m_timingCircBuff );
1656 0 : m_timingCircBuff = 0;
1657 : }
1658 :
1659 0 : if( opened )
1660 : {
1661 0 : if( m_semaphoreNumber >= 0 )
1662 : {
1663 : ///\todo is this release necessary with closeIM?
1664 0 : image.semReadPID[m_semaphoreNumber] = 0; // release semaphore
1665 : }
1666 0 : ImageStreamIO_closeIm( &image );
1667 0 : opened = false;
1668 : }
1669 :
1670 : } // outer loop, will exit if m_shutdown==true
1671 :
1672 : // One more check
1673 0 : if( m_rawImageCircBuff )
1674 : {
1675 0 : free( m_rawImageCircBuff );
1676 0 : m_rawImageCircBuff = 0;
1677 : }
1678 :
1679 0 : if( m_timingCircBuff )
1680 : {
1681 0 : free( m_timingCircBuff );
1682 0 : m_timingCircBuff = 0;
1683 : }
1684 :
1685 0 : if( opened )
1686 : {
1687 0 : if( m_semaphoreNumber >= 0 )
1688 : {
1689 : ///\todo is this release necessary with closeIM?
1690 0 : image.semReadPID[m_semaphoreNumber] = 0; // release semaphore.
1691 : }
1692 :
1693 0 : ImageStreamIO_closeIm( &image );
1694 : }
1695 : }
1696 :
1697 0 : void streamWriter::swThreadStart( streamWriter *s )
1698 : {
1699 0 : s->swThreadExec();
1700 0 : }
1701 :
1702 0 : void streamWriter::swThreadExec()
1703 : {
1704 0 : m_swThreadID = syscall( SYS_gettid );
1705 :
1706 : // Wait fpr the thread starter to finish initializing this thread.
1707 0 : while( m_swThreadInit == true && m_shutdown == 0 )
1708 : {
1709 0 : sleep( 1 );
1710 : }
1711 :
1712 0 : while( !m_shutdown )
1713 : {
1714 0 : while( !shutdown() && ( !( state() == stateCodes::READY || state() == stateCodes::OPERATING ) ) )
1715 : {
1716 0 : sleep( 1 );
1717 : }
1718 :
1719 0 : if( shutdown() )
1720 : {
1721 0 : break;
1722 : }
1723 :
1724 : timespec ts;
1725 :
1726 0 : if( clock_gettime( CLOCK_REALTIME, &ts ) < 0 )
1727 : {
1728 0 : log<software_critical>( { __FILE__, __LINE__, errno, 0, "clock_gettime" } );
1729 0 : return; // will trigger a shutdown
1730 : }
1731 :
1732 0 : mx::sys::timespecAddNsec( ts, m_semWaitNSec );
1733 :
1734 0 : if( sem_timedwait( &m_swSemaphore, &ts ) == 0 )
1735 : {
1736 0 : if( doEncode() < 0 )
1737 : {
1738 0 : log<software_critical>( { __FILE__, __LINE__, "error encoding data" } );
1739 0 : return;
1740 : }
1741 : // Otherwise, success, and we just go on.
1742 : }
1743 : else
1744 : {
1745 : // Check for why we timed out
1746 0 : if( errno == EINTR )
1747 : {
1748 0 : continue; // This will probably indicate time to shutdown, loop will exit normally if flags set.
1749 : }
1750 :
1751 : // ETIMEDOUT just means we should wait more.
1752 : // Otherwise, report an error.
1753 0 : if( errno != ETIMEDOUT )
1754 : {
1755 0 : log<software_error>( { __FILE__, __LINE__, errno, "sem_timedwait" } );
1756 0 : break;
1757 : }
1758 : }
1759 : } // outer loop, will exit if m_shutdown==true
1760 : }
1761 :
1762 4 : int streamWriter::doEncode()
1763 : {
1764 4 : if( m_writing == NOT_WRITING )
1765 : {
1766 0 : return 0;
1767 : }
1768 :
1769 4 : recordSavingState( true );
1770 :
1771 : // Record these to prevent a change in other thread
1772 4 : uint64_t saveStart = m_currSaveStart;
1773 4 : uint64_t saveStopFrameNo = m_currSaveStopFrameNo;
1774 4 : size_t nFrames = m_currSaveStop - saveStart;
1775 4 : size_t nBytes = m_width * m_height * m_typeSize;
1776 :
1777 : #ifdef SW_DEBUG
1778 : std::cerr << "nFrames: " << nFrames << "\n";
1779 : #endif
1780 :
1781 4 : if( nFrames == 0 ) // can happend during a stop. just clean up but don't try to write nothting.
1782 : {
1783 : #ifdef SW_DEBUG
1784 : std::cerr << "nothing to write\n";
1785 : #endif
1786 :
1787 0 : recordSavingStats( true );
1788 :
1789 0 : if( m_writing == STOP_WRITING )
1790 : {
1791 0 : m_writing = NOT_WRITING;
1792 0 : log<saving_stop>( { 0, saveStopFrameNo } );
1793 : }
1794 :
1795 0 : recordSavingState( true );
1796 :
1797 0 : return 0;
1798 : }
1799 : // Configure xrif and copy image data -- this does no allocations
1800 4 : int rv = xrif_set_size( m_xrif, m_width, m_height, 1, nFrames, m_dataType );
1801 4 : if( rv != XRIF_NOERROR )
1802 : {
1803 : // This is a big problem. Report it as "ALERT" and go on.
1804 0 : log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif set size error. DATA POSSIBLY LOST" } );
1805 : }
1806 :
1807 4 : rv = xrif_set_lz4_acceleration( m_xrif, m_lz4accel );
1808 4 : if( rv != XRIF_NOERROR )
1809 : {
1810 : // This may just be out of range, it's only an error.
1811 0 : log<software_error>( { __FILE__, __LINE__, 0, rv, "xrif set LZ4 acceleration error." } );
1812 : }
1813 :
1814 4 : memcpy( m_xrif->raw_buffer, m_rawImageCircBuff + saveStart * nBytes, nFrames * nBytes );
1815 :
1816 : // Configure xrif and copy timing data -- no allocations
1817 4 : rv = xrif_set_size( m_xrif_timing, 5, 1, 1, nFrames, XRIF_TYPECODE_UINT64 );
1818 4 : if( rv != XRIF_NOERROR )
1819 : {
1820 : // This is a big problem. Report it as "ALERT" and go on.
1821 0 : log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif set size error. DATA POSSIBLY LOST." } );
1822 : }
1823 :
1824 4 : rv = xrif_set_lz4_acceleration( m_xrif_timing, m_lz4accel );
1825 4 : if( rv != XRIF_NOERROR )
1826 : {
1827 : // This may just be out of range, it's only an error.
1828 0 : log<software_error>( { __FILE__, __LINE__, 0, rv, "xrif set LZ4 acceleration error." } );
1829 : }
1830 :
1831 : #ifdef SW_DEBUG
1832 : for( size_t nF = 0; nF < nFrames; ++nF )
1833 : {
1834 : std::cerr << " " << ( m_timingCircBuff + saveStart * 5 + nF * 5 )[0] << "\n";
1835 : }
1836 : #endif
1837 :
1838 4 : memcpy( m_xrif_timing->raw_buffer, m_timingCircBuff + saveStart * 5, nFrames * 5 * sizeof( uint64_t ) );
1839 :
1840 4 : rv = xrif_encode( m_xrif );
1841 4 : if( rv != XRIF_NOERROR )
1842 : {
1843 : // This is a big problem. Report it as "ALERT" and go on.
1844 0 : log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif encode error. DATA POSSIBLY LOST." } );
1845 : }
1846 :
1847 4 : rv = xrif_write_header( m_xrif_header, m_xrif );
1848 4 : if( rv != XRIF_NOERROR )
1849 : {
1850 : // This is a big problem. Report it as "ALERT" and go on.
1851 0 : log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif write header error. DATA POSSIBLY LOST." } );
1852 : }
1853 :
1854 4 : rv = xrif_encode( m_xrif_timing );
1855 4 : if( rv != XRIF_NOERROR )
1856 : {
1857 : // This is a big problem. Report it as "ALERT" and go on.
1858 0 : log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif encode error. DATA POSSIBLY LOST." } );
1859 : }
1860 :
1861 4 : rv = xrif_write_header( m_xrif_timing_header, m_xrif_timing );
1862 4 : if( rv != XRIF_NOERROR )
1863 : {
1864 : // This is a big problem. Report it as "ALERT" and go on.
1865 0 : log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif write header error. DATA POSSIBLY LOST" } );
1866 : }
1867 :
1868 : // Now break down the acq time of the first image in the buffer for use in file name
1869 : // tm uttime; // The broken down time.
1870 4 : timespec *fts = reinterpret_cast<timespec *>( m_timingCircBuff + saveStart * 5 + 1 );
1871 :
1872 4 : std::string fileName;
1873 4 : std::string relPath;
1874 8 : mx::error_t errc = file::fileTimeRelPath( fileName, relPath, m_outName, "xrif", fts->tv_sec, fts->tv_nsec );
1875 4 : if(errc != mx::error_t::noerror)
1876 : {
1877 0 : std::string msg = "error from file::fileTimeRePath: ";
1878 0 : msg += mx::errorMessage(errc);
1879 0 : msg += " (" + std::string(mx::errorName(errc)) + ")";
1880 0 : return log<software_error, -1>( { __FILE__, __LINE__, msg } );
1881 0 : }
1882 :
1883 4 : std::string fullPath = m_rawimageDir + '/' + relPath;
1884 :
1885 : try
1886 : {
1887 4 : std::filesystem::create_directories( fullPath ); // this does nothing if fname already exists
1888 : }
1889 0 : catch( const std::filesystem::filesystem_error &e )
1890 : {
1891 0 : std::string msg = "filesystem_error from std::create_directories. ";
1892 0 : msg += e.what();
1893 0 : msg += " code: ";
1894 0 : msg += e.code().value();
1895 0 : return log<software_critical, -1>( { __FILE__, __LINE__, msg } );
1896 0 : }
1897 0 : catch( const std::exception &e )
1898 : {
1899 0 : std::string msg = "exception from std::create_directories. ";
1900 0 : msg += e.what();
1901 0 : return log<software_critical, -1>( { __FILE__, __LINE__, msg } );
1902 0 : }
1903 :
1904 4 : m_outFilePath = fullPath + '/' + fileName;
1905 4 : FILE *fp_xrif = fopen( m_outFilePath.c_str(), "wb" );
1906 4 : if( fp_xrif == NULL )
1907 : {
1908 : // This is it. If we can't write data to disk need to fix.
1909 0 : return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "failed to open file for writing" } );
1910 : }
1911 :
1912 4 : size_t bw = fwrite( m_xrif_header, sizeof( uint8_t ), XRIF_HEADER_SIZE, fp_xrif );
1913 :
1914 4 : if( bw != XRIF_HEADER_SIZE )
1915 : {
1916 0 : log<software_alert>( { __FILE__,
1917 : __LINE__,
1918 0 : errno,
1919 : 0,
1920 0 : "failure writing header to file. DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
1921 : // We go on . . .
1922 : }
1923 :
1924 4 : bw = fwrite( m_xrif->raw_buffer, sizeof( uint8_t ), m_xrif->compressed_size, fp_xrif );
1925 :
1926 4 : if( bw != m_xrif->compressed_size )
1927 : {
1928 0 : log<software_alert>( { __FILE__,
1929 : __LINE__,
1930 0 : errno,
1931 : 0,
1932 0 : "failure writing data to file. DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
1933 : }
1934 :
1935 4 : bw = fwrite( m_xrif_timing_header, sizeof( uint8_t ), XRIF_HEADER_SIZE, fp_xrif );
1936 :
1937 4 : if( bw != XRIF_HEADER_SIZE )
1938 : {
1939 0 : log<software_alert>(
1940 : { __FILE__,
1941 : __LINE__,
1942 0 : errno,
1943 : 0,
1944 0 : "failure writing timing header to file. DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
1945 : }
1946 :
1947 4 : bw = fwrite( m_xrif_timing->raw_buffer, sizeof( uint8_t ), m_xrif_timing->compressed_size, fp_xrif );
1948 :
1949 4 : if( bw != m_xrif_timing->compressed_size )
1950 : {
1951 0 : log<software_alert>(
1952 : { __FILE__,
1953 : __LINE__,
1954 0 : errno,
1955 : 0,
1956 0 : "failure writing timing data to file. DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
1957 : }
1958 :
1959 4 : fclose( fp_xrif );
1960 :
1961 4 : recordSavingStats( true );
1962 :
1963 4 : if( m_writing == STOP_WRITING )
1964 : {
1965 0 : m_writing = NOT_WRITING;
1966 0 : log<saving_stop>( { 0, saveStopFrameNo } );
1967 : }
1968 :
1969 4 : recordSavingState( true );
1970 :
1971 4 : return 0;
1972 :
1973 4 : } // doEncode
1974 :
1975 3 : INDI_NEWCALLBACK_DEFN( streamWriter, m_indiP_writing )
1976 : ( const pcf::IndiProperty &ipRecv )
1977 : {
1978 3 : INDI_VALIDATE_CALLBACK_PROPS( m_indiP_writing, ipRecv );
1979 :
1980 : if( !ipRecv.find( "toggle" ) )
1981 : {
1982 : return 0;
1983 : }
1984 :
1985 : if( ipRecv["toggle"].getSwitchState() == pcf::IndiElement::Off &&
1986 : ( m_writing == WRITING || m_writing == START_WRITING ) )
1987 : {
1988 : m_writing = STOP_WRITING;
1989 : }
1990 :
1991 : if( ipRecv["toggle"].getSwitchState() == pcf::IndiElement::On && m_writing == NOT_WRITING )
1992 : {
1993 : m_writing = START_WRITING;
1994 : }
1995 :
1996 : return 0;
1997 : }
1998 :
1999 0 : void streamWriter::updateINDI()
2000 : {
2001 : // Only update this if not changing
2002 0 : if( m_writing == NOT_WRITING || m_writing == WRITING )
2003 : {
2004 0 : if( m_xrif && m_writing == WRITING )
2005 : {
2006 0 : indi::updateSwitchIfChanged( m_indiP_writing, "toggle", pcf::IndiElement::On, m_indiDriver, INDI_OK );
2007 0 : indi::updateIfChanged( m_indiP_xrifStats, "ratio", m_xrif->compression_ratio, m_indiDriver, INDI_BUSY );
2008 0 : indi::updateIfChanged(
2009 0 : m_indiP_xrifStats, "encodeMBsec", m_xrif->encode_rate / 1048576.0, m_indiDriver, INDI_BUSY );
2010 0 : indi::updateIfChanged( m_indiP_xrifStats,
2011 : "encodeFPS",
2012 0 : m_xrif->encode_rate / ( m_width * m_height * m_typeSize ),
2013 : m_indiDriver,
2014 : INDI_BUSY );
2015 0 : indi::updateIfChanged(
2016 0 : m_indiP_xrifStats, "differenceMBsec", m_xrif->difference_rate / 1048576.0, m_indiDriver, INDI_BUSY );
2017 0 : indi::updateIfChanged( m_indiP_xrifStats,
2018 : "differenceFPS",
2019 0 : m_xrif->difference_rate / ( m_width * m_height * m_typeSize ),
2020 : m_indiDriver,
2021 : INDI_BUSY );
2022 0 : indi::updateIfChanged(
2023 0 : m_indiP_xrifStats, "reorderMBsec", m_xrif->reorder_rate / 1048576.0, m_indiDriver, INDI_BUSY );
2024 0 : indi::updateIfChanged( m_indiP_xrifStats,
2025 : "reorderFPS",
2026 0 : m_xrif->reorder_rate / ( m_width * m_height * m_typeSize ),
2027 : m_indiDriver,
2028 : INDI_BUSY );
2029 0 : indi::updateIfChanged(
2030 0 : m_indiP_xrifStats, "compressMBsec", m_xrif->compress_rate / 1048576.0, m_indiDriver, INDI_BUSY );
2031 0 : indi::updateIfChanged( m_indiP_xrifStats,
2032 : "compressFPS",
2033 0 : m_xrif->compress_rate / ( m_width * m_height * m_typeSize ),
2034 : m_indiDriver,
2035 : INDI_BUSY );
2036 : }
2037 : else
2038 : {
2039 0 : indi::updateSwitchIfChanged( m_indiP_writing, "toggle", pcf::IndiElement::Off, m_indiDriver, INDI_OK );
2040 0 : indi::updateIfChanged( m_indiP_xrifStats, "ratio", 0.0, m_indiDriver, INDI_IDLE );
2041 0 : indi::updateIfChanged( m_indiP_xrifStats, "encodeMBsec", 0.0, m_indiDriver, INDI_IDLE );
2042 0 : indi::updateIfChanged( m_indiP_xrifStats, "encodeFPS", 0.0, m_indiDriver, INDI_IDLE );
2043 0 : indi::updateIfChanged( m_indiP_xrifStats, "differenceMBsec", 0.0, m_indiDriver, INDI_IDLE );
2044 0 : indi::updateIfChanged( m_indiP_xrifStats, "differenceFPS", 0.0, m_indiDriver, INDI_IDLE );
2045 0 : indi::updateIfChanged( m_indiP_xrifStats, "reorderMBsec", 0.0, m_indiDriver, INDI_IDLE );
2046 0 : indi::updateIfChanged( m_indiP_xrifStats, "reorderFPS", 0.0, m_indiDriver, INDI_IDLE );
2047 0 : indi::updateIfChanged( m_indiP_xrifStats, "compressMBsec", 0.0, m_indiDriver, INDI_IDLE );
2048 0 : indi::updateIfChanged( m_indiP_xrifStats, "compressFPS", 0.0, m_indiDriver, INDI_IDLE );
2049 : }
2050 : }
2051 0 : }
2052 :
2053 0 : int streamWriter::checkRecordTimes()
2054 : {
2055 0 : return telemeterT::checkRecordTimes( telem_saving_state() );
2056 : }
2057 :
2058 0 : int streamWriter::recordTelem( const telem_saving_state * )
2059 : {
2060 0 : return recordSavingState( true );
2061 : }
2062 :
2063 8 : int streamWriter::recordSavingState( bool force )
2064 : {
2065 : static int16_t lastState = -1;
2066 : static uint64_t currSaveStart = -1;
2067 :
2068 : int16_t state;
2069 8 : if( m_writing == WRITING || m_writing == START_WRITING ||
2070 0 : m_writing == STOP_WRITING ) // Changed from just writing 5/2024
2071 8 : state = 1;
2072 : else
2073 0 : state = 0;
2074 :
2075 8 : if( state != lastState || m_currSaveStart != currSaveStart || force )
2076 : {
2077 8 : telem<telem_saving_state>( { state, m_currSaveStart } );
2078 :
2079 8 : lastState = state;
2080 8 : currSaveStart = m_currSaveStart;
2081 : }
2082 :
2083 8 : return 0;
2084 : }
2085 :
2086 4 : int streamWriter::recordSavingStats( bool force )
2087 : {
2088 : static uint32_t last_rawSize = -1;
2089 : static uint32_t last_compressedSize = -1;
2090 : static float last_encodeRate = -1;
2091 : static float last_differenceRate = -1;
2092 : static float last_reorderRate = -1;
2093 : static float last_compressRate = -1;
2094 :
2095 4 : if( m_xrif->raw_size != last_rawSize || m_xrif->compressed_size != last_compressedSize ||
2096 2 : m_xrif->encode_rate != last_encodeRate || m_xrif->difference_rate != last_differenceRate ||
2097 0 : m_xrif->reorder_rate != last_reorderRate || m_xrif->compress_rate != last_compressRate || force )
2098 : {
2099 4 : telem<telem_saving>( { (uint32_t)m_xrif->raw_size,
2100 0 : (uint32_t)m_xrif->compressed_size,
2101 0 : (float)m_xrif->encode_rate,
2102 0 : (float)m_xrif->difference_rate,
2103 0 : (float)m_xrif->reorder_rate,
2104 4 : (float)m_xrif->compress_rate } );
2105 :
2106 4 : last_rawSize = m_xrif->raw_size;
2107 4 : last_compressedSize = m_xrif->compressed_size;
2108 4 : last_encodeRate = m_xrif->encode_rate;
2109 4 : last_differenceRate = m_xrif->difference_rate;
2110 4 : last_reorderRate = m_xrif->reorder_rate;
2111 4 : last_compressRate = m_xrif->compress_rate;
2112 : }
2113 :
2114 4 : return 0;
2115 : }
2116 :
2117 : } // namespace app
2118 : } // namespace MagAOX
2119 :
2120 : #endif
|