Line data Source code
1 : /** \file streamWriter.hpp
2 : * \brief The MagAO-X Image Stream Writer
3 : *
4 : * \author Jared R. Males (jaredmales@gmail.com)
5 : *
6 : * \ingroup streamWriter_files
7 : */
8 :
9 : #ifndef streamWriter_hpp
10 : #define streamWriter_hpp
11 :
12 : #include <atomic>
13 : #include <filesystem>
14 :
15 : #include <ImageStreamIO/ImageStruct.h>
16 : #include <ImageStreamIO/ImageStreamIO.h>
17 :
18 : #include <xrif/xrif.h>
19 :
20 : #include <mx/sys/timeUtils.hpp>
21 :
22 : #include "../../libMagAOX/libMagAOX.hpp" //Note this is included on command line to trigger pch
23 : #include "../../magaox_git_version.h"
24 :
25 : #define NOT_WRITING ( 0 )
26 : #define START_WRITING ( 1 )
27 : #define WRITING ( 2 )
28 : #define STOP_WRITING ( 3 )
29 :
30 : // #define SW_DEBUG
31 :
32 : namespace MagAOX
33 : {
34 : namespace app
35 : {
36 :
37 : /** \defgroup streamWriter ImageStreamIO Stream Writing
38 : * \brief Writes the contents of an ImageStreamIO image stream to disk.
39 : *
40 : * <a href="../handbook/operating/software/apps/streamWriter.html">Application Documentation</a>
41 : *
42 : * \ingroup apps
43 : *
44 : */
45 :
46 : /** \defgroup streamWriter_files ImageStreamIO Stream Writing
47 : * \ingroup streamWriter
48 : */
49 :
50 : /** MagAO-X application to control writing ImageStreamIO streams to disk.
51 : *
52 : * \ingroup streamWriter
53 : *
54 : */
55 : class streamWriter : public MagAOXApp<>, public dev::telemeter<streamWriter>
56 : {
57 : typedef dev::telemeter<streamWriter> telemeterT;
58 :
59 : friend class dev::telemeter<streamWriter>;
60 :
61 : // Give the test harness access.
62 : friend class streamWriter_test;
63 : friend class streamWriter_data_test;
64 :
65 : protected:
66 : /** \name configurable parameters
67 : *@{
68 : */
69 :
70 : std::string m_rawimageDir; ///< The path where files will be saved.
71 :
72 : size_t m_maxCircBuffLength{ 1024 }; ///< The maximum length of the circular buffer, in frames
73 :
74 : double m_maxCircBuffSize{ 2048 }; ///< The maximum size of the circular bufffer in MB.
75 :
76 : size_t m_maxWriteChunkLength{ 512 }; /**< The maximum number of frames to write at a time. Must
77 : be an integer factor of m_maxCircBuffLength.*/
78 :
79 : double m_maxChunkTime{ 10 }; ///< The maximum time before writing regardless of number of frames.
80 :
81 : std::string m_shmimName; ///< The name of the shared memory buffer.
82 :
83 : std::string m_outName; ///< The name to use for outputting files, Default is m_shmimName.
84 :
85 : int m_semaphoreNumber{ 7 }; ///< The image structure semaphore index.
86 :
87 : unsigned m_semWaitSec{ 0 }; /**< The time in whole sec to wait on the semaphore,
88 : to which m_semWaitNSec is added. Default is 0 nsec.*/
89 :
90 : unsigned m_semWaitNSec{ 500000000 }; /**< The time in nsec to wait on the semaphore, added to m_semWaitSec.
91 : Max is 999999999. Default is 5e8 nsec. */
92 :
93 : bool m_warnMissedData{
94 : true }; ///< Whether missed-data backlog summaries should be emitted as warnings instead of informational logs.
95 :
96 : int m_lz4accel{ 1 };
97 :
98 : bool m_compress{ true };
99 :
100 : ///@}
101 :
102 : size_t m_circBuffLength{ 1024 }; ///< The length of the circular buffer, in frames
103 : double m_circBuffSize{ 2048.0 }; ///< The size of the circular buffer, in MB
104 : size_t m_writeChunkLength{ 512 }; ///< The number of frames to write at a time
105 :
106 : size_t m_width{ 0 }; ///< The width of the image
107 : size_t m_height{ 0 }; ///< The height of the image
108 : uint8_t m_dataType{ 0 }; ///< The ImageStreamIO type code.
109 : int m_typeSize{ 0 }; ///< The pixel byte depth
110 :
111 : char *m_rawImageCircBuff{ nullptr };
112 : uint64_t *m_timingCircBuff{ nullptr };
113 :
114 : size_t m_currImage{ 0 };
115 :
116 : uint64_t m_currImageTime{ 0 }; ///< The write-time of the current image in nanoseconds.
117 :
118 : uint64_t m_currChunkStartTime{ 0 }; ///< The write-time of the first image in the chunk in nanoseconds.
119 :
120 : std::atomic<uint64_t> m_skippedFrameCount{
121 : 0 }; ///< Count of skipped frames accumulated by the framegrabber thread since the last summary log.
122 :
123 : std::atomic<uint64_t> m_repeatSemaphoreCount{
124 : 0 }; ///< Count of repeated semaphore wakes with unchanged frame count since the last summary log.
125 :
126 : double m_skipSummaryIntervalSec{ 10.0 }; ///< Current interval between summary skip logs.
127 :
128 : double m_nextSkipSummaryTime{ 0.0 }; ///< Time after which the next summary skip log may be emitted.
129 :
130 : // Writer book-keeping:
131 : int m_writing{ NOT_WRITING }; /**< Controls whether or not images are being written,
132 : and sequences start and stop of writing.*/
133 :
134 : uint64_t m_currChunkStart{ 0 }; ///< The circular buffer starting position of the current to-be-written chunk.
135 : uint64_t m_nextChunkStart{ 0 }; ///< The circular buffer starting position of the next to-be-written chunk.
136 :
137 : uint64_t m_currSaveStart{ 0 }; ///< The circular buffer position at which to start saving.
138 : uint64_t m_currSaveStop{ 0 }; ///< The circular buffer position at which to stop saving.
139 :
140 : uint64_t m_currSaveStopFrameNo{ 0 }; ///< The frame number of the image at which saving stopped (for logging)
141 :
142 : /// The xrif compression handle for image data
143 : xrif_t m_xrif{ nullptr };
144 :
145 : /// Storage for the xrif image data file header
146 : char *m_xrif_header{ nullptr };
147 :
148 : /// The xrif compression handle for image data
149 : xrif_t m_xrif_timing{ nullptr };
150 :
151 : /// Storage for the xrif image data file header
152 : char *m_xrif_timing_header{ nullptr };
153 :
154 : std::string m_outFilePath; ///< The full path for the latest output file
155 :
156 : public:
157 : /// Default c'tor
158 : streamWriter();
159 :
160 : /// Destructor
161 : ~streamWriter() noexcept;
162 :
163 : /// Setup the configuration system (called by MagAOXApp::setup())
164 : virtual void setupConfig();
165 :
166 : /// load the configuration system results (called by MagAOXApp::setup())
167 : virtual void loadConfig();
168 :
169 : /// Startup functions
170 : /** Sets up the INDI vars.
171 : *
172 : */
173 : virtual int appStartup();
174 :
175 : /// Implementation of the FSM for the Siglent SDG
176 : virtual int appLogic();
177 :
178 : /// Do any needed shutdown tasks. Currently nothing in this app.
179 : virtual int appShutdown();
180 :
181 : protected:
182 : /** \name SIGSEGV & SIGBUS signal handling
183 : * These signals occur as a result of a ImageStreamIO source server resetting (e.g. changing frame sizes).
184 : * When they occur a restart of the framegrabber and framewriter thread main loops is triggered.
185 : *
186 : * @{
187 : */
188 : bool m_restart{ false };
189 :
190 : static streamWriter *m_selfWriter; ///< Static pointer to this (set in constructor). Used for getting out of the
191 : ///< static SIGSEGV handler.
192 :
193 : /// Initialize the xrif system.
194 : /** Allocates the handles and headers pointers.
195 : *
196 : * \returns 0 on success.
197 : * \returns -1 on error.
198 : */
199 : int initialize_xrif();
200 :
201 : /// Sets the handler for SIGSEGV and SIGBUS
202 : /** These are caused by ImageStreamIO server resets.
203 : */
204 : int setSigSegvHandler();
205 :
206 : /// The handler called when SIGSEGV or SIGBUS is received, which will be due to ImageStreamIO server resets. Just a
207 : /// wrapper for handlerSigSegv.
208 : static void _handlerSigSegv( int signum, siginfo_t *siginf, void *ucont );
209 :
210 : /// Handles SIGSEGV and SIGBUS. Sets m_restart to true.
211 : void handlerSigSegv( int signum, siginfo_t *siginf, void *ucont );
212 : ///@}
213 :
214 : /** \name Framegrabber Thread
215 : * This thread monitors the ImageStreamIO buffer and copies its images to the circular buffer.
216 : *
217 : * @{
218 : */
219 : int m_fgThreadPrio{ 1 }; ///< Priority of the framegrabber thread, should normally be > 00.
220 :
221 : std::string m_fgCpuset; ///< The cpuset for the framegrabber thread. Ignored if empty (the default).
222 :
223 : std::thread m_fgThread; ///< A separate thread for the actual framegrabbings
224 :
225 : bool m_fgThreadInit{ true }; ///< Synchronizer to ensure f.g. thread initializes before doing dangerous things.
226 :
227 : pid_t m_fgThreadID{ 0 }; ///< F.g. thread PID.
228 :
229 : pcf::IndiProperty m_fgThreadProp; ///< The property to hold the f.g. thread details.
230 :
231 : public:
232 : static void getCircBuffLengths( size_t &circBuffLength,
233 : double &circBuffSize,
234 : size_t &writeChunkLength,
235 : size_t maxCircBuffLength,
236 : double maxCircBuffSize,
237 : size_t maxWriteChunkLength,
238 : uint32_t width,
239 : uint32_t height,
240 : size_t typeSize );
241 :
242 : protected:
243 : /// Worker function to allocate the circular buffers.
244 : /** This takes place in the fg thread after connecting to the stream.
245 : *
246 : * \returns 0 on sucess.
247 : * \returns -1 on error.
248 : */
249 : int allocate_circbufs();
250 :
251 : /// Worker function to configure and allocate the xrif handles.
252 : /** This takes place in the fg thread after connecting to the stream.
253 : *
254 : * \returns 0 on sucess.
255 : * \returns -1 on error.
256 : */
257 : int allocate_xrif();
258 :
259 : /// Thread starter, called by fgThreadStart on thread construction. Calls fgThreadExec.
260 : static void fgThreadStart( streamWriter *s /**< [in] a pointer to an streamWriter instance (normally this) */ );
261 :
262 : /// Execute the frame grabber main loop.
263 : void fgThreadExec();
264 :
265 : ///@}
266 :
267 : /** \name Stream Writer Thread
268 : * This thread writes chunks of the circular buffer to disk.
269 : *
270 : * @{
271 : */
272 : int m_swThreadPrio{ 1 }; ///< Priority of the stream writer thread, should normally be > 0, and <= m_fgThreadPrio.
273 :
274 : std::string m_swCpuset; ///< The cpuset for the framegrabber thread. Ignored if empty (the default).
275 :
276 : sem_t m_swSemaphore; ///< Semaphore used to synchronize the fg thread and the sw thread.
277 :
278 : std::thread m_swThread; ///< A separate thread for the actual writing
279 :
280 : bool m_swThreadInit{ true }; ///< Synchronizer to ensure s.w. thread initializes before doing dangerous things.
281 :
282 : pid_t m_swThreadID{ 0 }; ///< S.w. thread pid.
283 :
284 : pcf::IndiProperty m_swThreadProp; ///< The property to hold the s.w. thread details.
285 :
286 : /// Thread starter, called by swThreadStart on thread construction. Calls swThreadExec.
287 : static void swThreadStart( streamWriter *s /**< [in] a pointer to an streamWriter instance (normally this) */ );
288 :
289 : /// Execute the stream writer main loop.
290 : void swThreadExec();
291 :
292 : /// Function called when semaphore is raised to do the encode and write.
293 : int doEncode();
294 : ///@}
295 :
296 : // INDI:
297 : protected:
298 : // declare our properties
299 : pcf::IndiProperty m_indiP_writing;
300 :
301 : pcf::IndiProperty m_indiP_xrifStats;
302 :
303 : public:
304 0 : INDI_NEWCALLBACK_DECL( streamWriter, m_indiP_writing );
305 :
306 : void updateINDI();
307 :
308 : /** \name Telemeter Interface
309 : *
310 : * @{
311 : */
312 : int checkRecordTimes();
313 :
314 : int recordTelem( const telem_saving_state * );
315 :
316 : int recordSavingState( bool force = false );
317 : int recordSavingStats( bool force = false );
318 :
319 : ///@}
320 : };
321 :
322 : // Set self pointer to null so app starts up uninitialized.
323 : streamWriter *streamWriter::m_selfWriter = nullptr;
324 :
325 117 : streamWriter::streamWriter() : MagAOXApp( MAGAOX_CURRENT_SHA1, MAGAOX_REPO_MODIFIED )
326 : {
327 39 : m_powerMgtEnabled = false;
328 :
329 39 : m_selfWriter = this;
330 :
331 39 : return;
332 0 : }
333 :
334 39 : streamWriter::~streamWriter() noexcept
335 : {
336 39 : if( m_xrif )
337 20 : xrif_delete( m_xrif );
338 :
339 39 : if( m_xrif_header )
340 28 : free( m_xrif_header );
341 :
342 39 : if( m_xrif_timing )
343 20 : xrif_delete( m_xrif_timing );
344 :
345 39 : if( m_xrif_timing_header )
346 28 : free( m_xrif_timing_header );
347 :
348 39 : return;
349 39 : }
350 :
351 12 : void streamWriter::setupConfig()
352 : {
353 168 : config.add( "writer.savePath",
354 : "",
355 : "writer.savePath",
356 : argType::Required,
357 : "writer",
358 : "savePath",
359 : false,
360 : "string",
361 : "The absolute path where images are saved. Will use MagAO-X default if not set." );
362 :
363 168 : config.add( "writer.maxCircBuffLength",
364 : "",
365 : "writer.maxCircBuffLength",
366 : argType::Required,
367 : "writer",
368 : "maxCircBuffLength",
369 : false,
370 : "size_t",
371 : "The maximum length in frames of the circular buffer. Should be an integer multiple of and larger than "
372 : "maxWriteChunkLength." );
373 :
374 168 : config.add( "writer.maxCircBuffSize",
375 : "",
376 : "writer.maxCircBuffSize",
377 : argType::Required,
378 : "writer",
379 : "maxCircBuffSize",
380 : false,
381 : "double",
382 : "The maximum size in MB of the circular buffer. Should be sized to hold at least 2 of the maximum "
383 : "frame size." );
384 :
385 168 : config.add(
386 : "writer.maxWriteChunkLength",
387 : "",
388 : "writer.maxWriteChunkLength",
389 : argType::Required,
390 : "writer",
391 : "maxWriteChunkLength",
392 : false,
393 : "size_t",
394 : "The maximum length in frames of the chunks to write to disk. Should be smaller than maxCircBuffLength." );
395 :
396 168 : config.add( "writer.maxChunkTime",
397 : "",
398 : "writer.maxChunkTime",
399 : argType::Required,
400 : "writer",
401 : "maxChunkTime",
402 : false,
403 : "float",
404 : "The max length in seconds of the chunks to write to disk. Default is 60 sec." );
405 :
406 168 : config.add( "writer.threadPrio",
407 : "",
408 : "writer.threadPrio",
409 : argType::Required,
410 : "writer",
411 : "threadPrio",
412 : false,
413 : "int",
414 : "The real-time priority of the stream writer thread." );
415 :
416 168 : config.add( "writer.cpuset",
417 : "",
418 : "writer.cpuset",
419 : argType::Required,
420 : "writer",
421 : "cpuset",
422 : false,
423 : "int",
424 : "The cpuset for the writer thread." );
425 :
426 168 : config.add( "writer.compress",
427 : "",
428 : "writer.compress",
429 : argType::Required,
430 : "writer",
431 : "compress",
432 : false,
433 : "bool",
434 : "Flag to set whether compression is used. Default true." );
435 :
436 168 : config.add( "writer.lz4accel",
437 : "",
438 : "writer.lz4accel",
439 : argType::Required,
440 : "writer",
441 : "lz4accel",
442 : false,
443 : "int",
444 : "The LZ4 acceleration parameter. Larger is faster, but lower compression." );
445 :
446 168 : config.add( "writer.outName",
447 : "",
448 : "writer.outName",
449 : argType::Required,
450 : "writer",
451 : "outName",
452 : false,
453 : "int",
454 : "The name to use for output files. Default is the shmimName." );
455 :
456 168 : config.add( "framegrabber.shmimName",
457 : "",
458 : "framegrabber.shmimName",
459 : argType::Required,
460 : "framegrabber",
461 : "shmimName",
462 : false,
463 : "int",
464 : "The name of the stream to monitor. From /tmp/shmimName.im.shm." );
465 :
466 168 : config.add( "framegrabber.semaphoreNumber",
467 : "",
468 : "framegrabber.semaphoreNumber",
469 : argType::Required,
470 : "framegrabber",
471 : "semaphoreNumber",
472 : false,
473 : "int",
474 : "The semaphore to wait on. Default is 7." );
475 :
476 168 : config.add( "framegrabber.semWait",
477 : "",
478 : "framegrabber.semWait",
479 : argType::Required,
480 : "framegrabber",
481 : "semWait",
482 : false,
483 : "int",
484 : "The time in nsec to wait on the semaphore. Max is 999999999. Default is 5e8 nsec." );
485 :
486 168 : config.add( "framegrabber.warnMissedData",
487 : "",
488 : "framegrabber.warnMissedData",
489 : argType::Required,
490 : "framegrabber",
491 : "warnMissedData",
492 : false,
493 : "bool",
494 : "Whether missed-data backlog summaries should be emitted at warning priority. Default is true." );
495 :
496 168 : config.add( "framegrabber.threadPrio",
497 : "",
498 : "framegrabber.threadPrio",
499 : argType::Required,
500 : "framegrabber",
501 : "threadPrio",
502 : false,
503 : "int",
504 : "The real-time priority of the framegrabber thread." );
505 :
506 168 : config.add( "framegrabber.cpuset",
507 : "",
508 : "framegrabber.cpuset",
509 : argType::Required,
510 : "framegrabber",
511 : "cpuset",
512 : false,
513 : "string",
514 : "The cpuset for the framegrabber thread." );
515 :
516 12 : telemeterT::setupConfig( config );
517 12 : }
518 :
519 12 : void streamWriter::loadConfig()
520 : {
521 :
522 24 : config( m_maxCircBuffLength, "writer.maxCircBuffLength" );
523 24 : config( m_maxCircBuffSize, "writer.maxCircBuffSize" );
524 24 : config( m_maxWriteChunkLength, "writer.maxWriteChunkLength" );
525 24 : config( m_maxChunkTime, "writer.maxChunkTime" );
526 24 : config( m_swThreadPrio, "writer.threadPrio" );
527 24 : config( m_swCpuset, "writer.cpuset" );
528 24 : config( m_compress, "writer.compress" );
529 24 : config( m_lz4accel, "writer.lz4accel" );
530 12 : if( m_lz4accel < XRIF_LZ4_ACCEL_MIN )
531 : {
532 1 : m_lz4accel = XRIF_LZ4_ACCEL_MIN;
533 : }
534 12 : if( m_lz4accel > XRIF_LZ4_ACCEL_MAX )
535 : {
536 1 : m_lz4accel = XRIF_LZ4_ACCEL_MAX;
537 : }
538 :
539 24 : config( m_shmimName, "framegrabber.shmimName" );
540 :
541 12 : m_outName = m_shmimName;
542 24 : config( m_outName, "writer.outName" );
543 :
544 24 : config( m_semaphoreNumber, "framegrabber.semaphoreNumber" );
545 24 : config( m_semWaitNSec, "framegrabber.semWait" );
546 24 : config( m_warnMissedData, "framegrabber.warnMissedData" );
547 :
548 24 : config( m_fgThreadPrio, "framegrabber.threadPrio" );
549 36 : config( m_fgCpuset, "framegrabber.cpuset" );
550 :
551 : // Set some defaults
552 : // Setup default saving path
553 12 : std::string tmpstr = mx::sys::getEnv( MAGAOX_env_rawimage );
554 12 : if( tmpstr == "" )
555 : {
556 12 : tmpstr = MAGAOX_rawimageRelPath;
557 : }
558 12 : m_rawimageDir = basePath() + "/" + tmpstr;
559 :
560 24 : config( m_rawimageDir, "writer.savePath" );
561 :
562 12 : if( telemeterT::loadConfig( config ) < 0 )
563 : {
564 0 : log<text_log>( "Error during telemeter config", logPrio::LOG_CRITICAL );
565 0 : m_shutdown = true;
566 : }
567 12 : }
568 :
569 5 : int streamWriter::appStartup()
570 : {
571 : // Create save directory.
572 5 : errno = 0;
573 5 : if( mkdir( m_rawimageDir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH ) < 0 )
574 : {
575 1 : if( errno != EEXIST )
576 : {
577 1 : std::stringstream logss;
578 1 : logss << "Failed to create image directory (" << m_rawimageDir << "). Errno says: " << strerror( errno );
579 1 : log<software_critical>( { __FILE__, __LINE__, errno, 0, logss.str() } );
580 :
581 1 : return -1;
582 1 : }
583 : }
584 :
585 : // set up the INDI properties
586 24 : createStandardIndiToggleSw( m_indiP_writing, "writing" );
587 4 : registerIndiPropertyNew( m_indiP_writing, INDI_NEWCALLBACK( m_indiP_writing ) );
588 :
589 : // Register the stats INDI property
590 12 : REG_INDI_NEWPROP_NOCB( m_indiP_xrifStats, "xrif", pcf::IndiProperty::Number );
591 12 : m_indiP_xrifStats.setLabel( "xrif compression performance" );
592 :
593 24 : indi::addNumberElement<float>( m_indiP_xrifStats, "ratio", 0, 1.0, 0.0, "%0.2f", "Compression Ratio" );
594 :
595 20 : indi::addNumberElement<float>( m_indiP_xrifStats,
596 : "differenceMBsec",
597 : 0,
598 4 : std::numeric_limits<float>::max(),
599 8 : 0.0,
600 : "%0.2f",
601 : "Differencing Rate [MB/sec]" );
602 :
603 20 : indi::addNumberElement<float>( m_indiP_xrifStats,
604 : "reorderMBsec",
605 : 0,
606 4 : std::numeric_limits<float>::max(),
607 8 : 0.0,
608 : "%0.2f",
609 : "Reordering Rate [MB/sec]" );
610 :
611 20 : indi::addNumberElement<float>( m_indiP_xrifStats,
612 : "compressMBsec",
613 : 0,
614 4 : std::numeric_limits<float>::max(),
615 8 : 0.0,
616 : "%0.2f",
617 : "Compression Rate [MB/sec]" );
618 :
619 20 : indi::addNumberElement<float>( m_indiP_xrifStats,
620 : "encodeMBsec",
621 : 0,
622 4 : std::numeric_limits<float>::max(),
623 8 : 0.0,
624 : "%0.2f",
625 : "Total Encoding Rate [MB/sec]" );
626 :
627 20 : indi::addNumberElement<float>( m_indiP_xrifStats,
628 : "differenceFPS",
629 : 0,
630 4 : std::numeric_limits<float>::max(),
631 8 : 0.0,
632 : "%0.2f",
633 : "Differencing Rate [f.p.s.]" );
634 :
635 20 : indi::addNumberElement<float>( m_indiP_xrifStats,
636 : "reorderFPS",
637 : 0,
638 4 : std::numeric_limits<float>::max(),
639 8 : 0.0,
640 : "%0.2f",
641 : "Reordering Rate [f.p.s.]" );
642 :
643 20 : indi::addNumberElement<float>( m_indiP_xrifStats,
644 : "compressFPS",
645 : 0,
646 4 : std::numeric_limits<float>::max(),
647 8 : 0.0,
648 : "%0.2f",
649 : "Compression Rate [f.p.s.]" );
650 :
651 16 : indi::addNumberElement<float>( m_indiP_xrifStats,
652 : "encodeFPS",
653 : 0,
654 4 : std::numeric_limits<float>::max(),
655 8 : 0.0,
656 : "%0.2f",
657 : "Total Encoding Rate [f.p.s.]" );
658 :
659 : // Now set up the framegrabber and writer threads.
660 : // - need SIGSEGV and SIGBUS handling for ImageStreamIO restarts
661 : // - initialize the semaphore
662 : // - start the threads
663 :
664 4 : if( setSigSegvHandler() < 0 )
665 0 : return log<software_error, -1>( { __FILE__, __LINE__ } );
666 :
667 4 : if( sem_init( &m_swSemaphore, 0, 0 ) < 0 )
668 0 : return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "Initializing S.W. semaphore" } );
669 :
670 : // Check if we have a safe writeChunkLengthh
671 4 : if( m_maxCircBuffLength % m_maxWriteChunkLength != 0 )
672 : {
673 3 : return log<software_critical, -1>(
674 1 : { __FILE__, __LINE__, "Write chunk length is not a divisor of circular buffer length." } );
675 : }
676 :
677 3 : if( initialize_xrif() < 0 )
678 : {
679 0 : log<software_critical, -1>( { __FILE__, __LINE__ } );
680 : }
681 :
682 9 : if( threadStart( m_fgThread,
683 3 : m_fgThreadInit,
684 3 : m_fgThreadID,
685 3 : m_fgThreadProp,
686 : m_fgThreadPrio,
687 3 : m_fgCpuset,
688 : "framegrabber",
689 : this,
690 3 : fgThreadStart ) < 0 )
691 : {
692 0 : return log<software_critical, -1>( { __FILE__, __LINE__ } );
693 : }
694 :
695 9 : if( threadStart( m_swThread,
696 3 : m_swThreadInit,
697 3 : m_swThreadID,
698 3 : m_swThreadProp,
699 : m_swThreadPrio,
700 3 : m_swCpuset,
701 : "streamwriter",
702 : this,
703 3 : swThreadStart ) < 0 )
704 : {
705 0 : log<software_critical, -1>( { __FILE__, __LINE__ } );
706 : }
707 :
708 3 : if( telemeterT::appStartup() < 0 )
709 : {
710 0 : return log<software_error, -1>( { __FILE__, __LINE__ } );
711 : }
712 :
713 3 : return 0;
714 : }
715 :
716 5 : int streamWriter::appLogic()
717 : {
718 5 : double now = mx::sys::get_curr_time();
719 5 : if( m_nextSkipSummaryTime == 0 )
720 : {
721 2 : m_nextSkipSummaryTime = now + m_skipSummaryIntervalSec;
722 : }
723 :
724 5 : if( now >= m_nextSkipSummaryTime )
725 : {
726 2 : uint64_t skippedFrames = m_skippedFrameCount.exchange( 0 );
727 2 : uint64_t repeatedSems = m_repeatSemaphoreCount.exchange( 0 );
728 2 : double summaryInterval = m_skipSummaryIntervalSec;
729 :
730 2 : bool shouldLogSkippedFrames = skippedFrames > 0;
731 2 : bool shouldLog = shouldLogSkippedFrames || repeatedSems > 0;
732 :
733 2 : if( shouldLog )
734 : {
735 1 : std::string msg = "stream ingest backlog: ";
736 1 : bool appended = false;
737 :
738 1 : if( shouldLogSkippedFrames )
739 : {
740 1 : msg += std::to_string( skippedFrames ) + " skipped frames";
741 1 : appended = true;
742 : }
743 :
744 1 : if( repeatedSems > 0 )
745 : {
746 1 : if( appended )
747 : {
748 1 : msg += ", ";
749 : }
750 1 : msg += std::to_string( repeatedSems ) + " repeated semaphore wakes";
751 : }
752 1 : msg += " in last " + std::to_string( static_cast<int>( summaryInterval ) ) + " sec";
753 :
754 1 : log<text_log>( msg, m_warnMissedData ? logPrio::LOG_WARNING : logPrio::LOG_INFO );
755 :
756 1 : m_skipSummaryIntervalSec *= 2.0;
757 1 : if( m_skipSummaryIntervalSec > 60.0 )
758 : {
759 0 : m_skipSummaryIntervalSec = 60.0;
760 : }
761 1 : }
762 : else
763 : {
764 1 : m_skipSummaryIntervalSec = 10.0;
765 : }
766 :
767 2 : m_nextSkipSummaryTime = now + m_skipSummaryIntervalSec;
768 : }
769 :
770 : // first do a join check to see if other threads have exited.
771 : // these will throw if the threads are really gone
772 : try
773 : {
774 5 : if( pthread_tryjoin_np( m_fgThread.native_handle(), 0 ) == 0 )
775 : {
776 0 : log<software_error>( { __FILE__, __LINE__, "framegrabber thread has exited" } );
777 0 : return -1;
778 : }
779 : }
780 0 : catch( ... )
781 : {
782 0 : log<software_error>( { __FILE__, __LINE__, "framegrabber thread has exited" } );
783 0 : return -1;
784 0 : }
785 :
786 : try
787 : {
788 5 : if( pthread_tryjoin_np( m_swThread.native_handle(), 0 ) == 0 )
789 : {
790 0 : log<software_error>( { __FILE__, __LINE__, "stream thread has exited" } );
791 0 : return -1;
792 : }
793 : }
794 0 : catch( ... )
795 : {
796 0 : log<software_error>( { __FILE__, __LINE__, "streamwriter thread has exited" } );
797 0 : return -1;
798 0 : }
799 :
800 5 : switch( m_writing )
801 : {
802 4 : case NOT_WRITING:
803 4 : state( stateCodes::READY );
804 4 : break;
805 1 : default:
806 1 : state( stateCodes::OPERATING );
807 : }
808 :
809 5 : if( state() == stateCodes::OPERATING )
810 : {
811 1 : if( telemeterT::appLogic() < 0 )
812 : {
813 0 : log<software_error>( { __FILE__, __LINE__ } );
814 0 : return 0;
815 : }
816 : }
817 :
818 5 : updateINDI();
819 :
820 5 : return 0;
821 : }
822 :
823 3 : int streamWriter::appShutdown()
824 : {
825 3 : m_writing = NOT_WRITING;
826 3 : updateINDI();
827 :
828 : try
829 : {
830 3 : if( m_fgThread.joinable() )
831 : {
832 3 : m_fgThread.join();
833 : }
834 : }
835 0 : catch( ... )
836 : {
837 0 : }
838 :
839 : try
840 : {
841 3 : if( m_swThread.joinable() )
842 : {
843 3 : m_swThread.join();
844 : }
845 : }
846 0 : catch( ... )
847 : {
848 0 : }
849 :
850 3 : if( m_xrif )
851 : {
852 3 : xrif_delete( m_xrif );
853 3 : m_xrif = nullptr;
854 : }
855 :
856 3 : if( m_xrif_timing )
857 : {
858 3 : xrif_delete( m_xrif_timing );
859 3 : m_xrif_timing = nullptr;
860 : }
861 :
862 3 : telemeterT::appShutdown();
863 :
864 3 : return 0;
865 : }
866 :
867 28 : int streamWriter::initialize_xrif()
868 : {
869 28 : xrif_error_t rv = xrif_new( &m_xrif );
870 28 : if( rv != XRIF_NOERROR )
871 : {
872 0 : return log<software_critical, -1>(
873 0 : { __FILE__, __LINE__, 0, rv, "xrif handle allocation or initialization error." } );
874 : }
875 :
876 28 : if( m_compress )
877 : {
878 27 : rv = xrif_configure( m_xrif, XRIF_DIFFERENCE_PREVIOUS, XRIF_REORDER_BYTEPACK, XRIF_COMPRESS_LZ4 );
879 27 : if( rv != XRIF_NOERROR )
880 : {
881 0 : return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
882 : }
883 : }
884 : else
885 : {
886 1 : std::cerr << "not compressing . . . \n";
887 1 : rv = xrif_configure( m_xrif, XRIF_DIFFERENCE_NONE, XRIF_REORDER_NONE, XRIF_COMPRESS_NONE );
888 1 : if( rv != XRIF_NOERROR )
889 : {
890 0 : return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
891 : }
892 : }
893 :
894 28 : errno = 0;
895 :
896 28 : m_xrif_header = reinterpret_cast<char *>( malloc( XRIF_HEADER_SIZE * sizeof( char ) ) );
897 28 : if( m_xrif_header == NULL )
898 : {
899 0 : return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "xrif header allocation failed." } );
900 : }
901 :
902 28 : rv = xrif_new( &m_xrif_timing );
903 28 : if( rv != XRIF_NOERROR )
904 : {
905 0 : return log<software_critical, -1>(
906 0 : { __FILE__, __LINE__, 0, rv, "xrif handle allocation or initialization error." } );
907 : }
908 :
909 : // m_xrif_timing->reorder_method = XRIF_REORDER_NONE;
910 28 : rv = xrif_configure( m_xrif_timing, XRIF_DIFFERENCE_NONE, XRIF_REORDER_NONE, XRIF_COMPRESS_NONE );
911 28 : if( rv != XRIF_NOERROR )
912 : {
913 0 : return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
914 : }
915 :
916 28 : errno = 0;
917 :
918 28 : m_xrif_timing_header = reinterpret_cast<char *>( malloc( XRIF_HEADER_SIZE * sizeof( char ) ) );
919 28 : if( m_xrif_timing_header == NULL )
920 : {
921 0 : return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "xrif header allocation failed." } );
922 : }
923 :
924 28 : return 0;
925 : }
926 :
927 4 : int streamWriter::setSigSegvHandler()
928 : {
929 : struct sigaction act;
930 : sigset_t set;
931 :
932 4 : act.sa_sigaction = &streamWriter::_handlerSigSegv;
933 4 : act.sa_flags = SA_SIGINFO;
934 4 : sigemptyset( &set );
935 4 : act.sa_mask = set;
936 :
937 4 : errno = 0;
938 4 : if( sigaction( SIGSEGV, &act, 0 ) < 0 )
939 : {
940 0 : std::string logss = "Setting handler for SIGSEGV failed. Errno says: ";
941 0 : logss += strerror( errno );
942 :
943 0 : log<software_error>( { __FILE__, __LINE__, errno, 0, logss } );
944 :
945 0 : return -1;
946 0 : }
947 :
948 4 : errno = 0;
949 4 : if( sigaction( SIGBUS, &act, 0 ) < 0 )
950 : {
951 0 : std::string logss = "Setting handler for SIGBUS failed. Errno says: ";
952 0 : logss += strerror( errno );
953 :
954 0 : log<software_error>( { __FILE__, __LINE__, errno, 0, logss } );
955 :
956 0 : return -1;
957 0 : }
958 :
959 4 : log<text_log>( "Installed SIGSEGV/SIGBUS signal handler.", logPrio::LOG_DEBUG );
960 :
961 4 : return 0;
962 : }
963 :
964 0 : void streamWriter::_handlerSigSegv( int signum, siginfo_t *siginf, void *ucont )
965 : {
966 0 : m_selfWriter->handlerSigSegv( signum, siginf, ucont );
967 0 : }
968 :
969 0 : void streamWriter::handlerSigSegv( int signum, siginfo_t *siginf, void *ucont )
970 : {
971 : static_cast<void>( signum );
972 : static_cast<void>( siginf );
973 : static_cast<void>( ucont );
974 :
975 0 : m_restart = true;
976 :
977 0 : return;
978 : }
979 :
980 37 : void streamWriter::getCircBuffLengths( size_t &circBuffLength,
981 : double &circBuffSize,
982 : size_t &writeChunkLength,
983 : size_t maxCircBuffLength,
984 : double maxCircBuffSize,
985 : size_t maxWriteChunkLength,
986 : uint32_t width,
987 : uint32_t height,
988 : size_t typeSize )
989 : {
990 : static constexpr double MB = 1048576.0;
991 :
992 37 : size_t isz = width * height * typeSize * maxCircBuffLength;
993 :
994 37 : if( isz <= maxCircBuffSize * MB )
995 : {
996 28 : circBuffLength = maxCircBuffLength;
997 28 : circBuffSize = isz / MB;
998 28 : writeChunkLength = maxWriteChunkLength;
999 :
1000 28 : return;
1001 : }
1002 :
1003 9 : circBuffLength = maxCircBuffSize * MB / ( width * height * typeSize );
1004 :
1005 9 : if( circBuffLength % 2 == 1 )
1006 : {
1007 4 : --circBuffLength;
1008 : }
1009 :
1010 9 : circBuffSize = ( width * height * typeSize * circBuffLength ) / MB;
1011 :
1012 9 : writeChunkLength = ( 1.0 * maxWriteChunkLength / maxCircBuffLength ) * circBuffLength;
1013 :
1014 9 : if( circBuffLength == 0 )
1015 : {
1016 3 : return;
1017 : }
1018 :
1019 6 : if( writeChunkLength == 0 )
1020 : {
1021 1 : writeChunkLength = 1;
1022 : }
1023 :
1024 7 : while( circBuffLength % writeChunkLength != 0 )
1025 : {
1026 1 : --writeChunkLength;
1027 : }
1028 :
1029 6 : return;
1030 : }
1031 :
1032 27 : int streamWriter::allocate_circbufs()
1033 : {
1034 :
1035 27 : getCircBuffLengths( m_circBuffLength,
1036 27 : m_circBuffSize,
1037 27 : m_writeChunkLength,
1038 : m_maxCircBuffLength,
1039 : m_maxCircBuffSize,
1040 : m_maxWriteChunkLength,
1041 27 : m_width,
1042 27 : m_height,
1043 27 : m_typeSize );
1044 :
1045 27 : if( m_circBuffLength < 2 )
1046 : {
1047 2 : return log<software_critical, -1>( { __FILE__, __LINE__, "frame size too large to fit in maxCircBuffSize" } );
1048 : }
1049 :
1050 26 : if( m_writeChunkLength >= m_circBuffLength )
1051 : {
1052 0 : return log<software_critical, -1>(
1053 0 : { __FILE__, __LINE__, "writeChunkLength is not smaller than circBuffLength" } );
1054 : }
1055 :
1056 26 : if( m_circBuffLength % m_writeChunkLength != 0 )
1057 : {
1058 0 : return log<software_critical, -1>(
1059 0 : { __FILE__, __LINE__, "writeChunkLength is not an integer factor of circBuffLength" } );
1060 : }
1061 :
1062 26 : std::string msg = "Set circ buff length: " + std::to_string( m_circBuffLength ) + " frames (";
1063 26 : msg += std::to_string( m_circBuffSize ) + " MB). Write chunk length: " + std::to_string( m_writeChunkLength );
1064 26 : msg += " frames.";
1065 :
1066 26 : log<text_log>( msg, logPrio::LOG_NOTICE );
1067 :
1068 26 : if( m_rawImageCircBuff )
1069 : {
1070 0 : free( m_rawImageCircBuff );
1071 : }
1072 :
1073 26 : errno = 0;
1074 26 : m_rawImageCircBuff = reinterpret_cast<char *>( malloc( m_width * m_height * m_typeSize * m_circBuffLength ) );
1075 :
1076 26 : if( m_rawImageCircBuff == NULL )
1077 : {
1078 0 : return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "buffer allocation failure" } );
1079 : }
1080 :
1081 26 : if( m_timingCircBuff )
1082 : {
1083 0 : free( m_timingCircBuff );
1084 : }
1085 :
1086 26 : errno = 0;
1087 26 : m_timingCircBuff = reinterpret_cast<uint64_t *>( malloc( 5 * sizeof( uint64_t ) * m_circBuffLength ) );
1088 26 : if( m_timingCircBuff == NULL )
1089 : {
1090 0 : return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "buffer allocation failure" } );
1091 : }
1092 :
1093 26 : return 0;
1094 26 : }
1095 :
1096 26 : int streamWriter::allocate_xrif()
1097 : {
1098 : // Set up the image data xrif handle
1099 : xrif_error_t rv;
1100 :
1101 26 : if( m_compress )
1102 : {
1103 25 : rv = xrif_configure( m_xrif, XRIF_DIFFERENCE_PREVIOUS, XRIF_REORDER_BYTEPACK, XRIF_COMPRESS_LZ4 );
1104 25 : if( rv != XRIF_NOERROR )
1105 : {
1106 1 : return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
1107 : }
1108 : }
1109 : else
1110 : {
1111 1 : std::cerr << "not compressing . . . \n";
1112 1 : rv = xrif_configure( m_xrif, XRIF_DIFFERENCE_NONE, XRIF_REORDER_NONE, XRIF_COMPRESS_NONE );
1113 1 : if( rv != XRIF_NOERROR )
1114 : {
1115 0 : return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
1116 : }
1117 : }
1118 :
1119 25 : rv = xrif_set_size( m_xrif, m_width, m_height, 1, m_writeChunkLength, m_dataType );
1120 25 : if( rv != XRIF_NOERROR )
1121 : {
1122 0 : return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_set_size error." } );
1123 : }
1124 :
1125 25 : rv = xrif_allocate_raw( m_xrif );
1126 25 : if( rv != XRIF_NOERROR )
1127 : {
1128 0 : return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_raw error." } );
1129 : }
1130 :
1131 25 : rv = xrif_allocate_reordered( m_xrif );
1132 25 : if( rv != XRIF_NOERROR )
1133 : {
1134 0 : return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_reordered error." } );
1135 : }
1136 :
1137 : // Set up the timing data xrif handle
1138 25 : rv = xrif_configure( m_xrif_timing, XRIF_DIFFERENCE_NONE, XRIF_REORDER_NONE, XRIF_COMPRESS_NONE );
1139 25 : if( rv != XRIF_NOERROR )
1140 : {
1141 0 : return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
1142 : }
1143 :
1144 25 : rv = xrif_set_size( m_xrif_timing, 5, 1, 1, m_writeChunkLength, XRIF_TYPECODE_UINT64 );
1145 25 : if( rv != XRIF_NOERROR )
1146 : {
1147 0 : return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_set_size error." } );
1148 : }
1149 :
1150 25 : rv = xrif_allocate_raw( m_xrif_timing );
1151 25 : if( rv != XRIF_NOERROR )
1152 : {
1153 0 : return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_raw error." } );
1154 : }
1155 :
1156 25 : rv = xrif_allocate_reordered( m_xrif_timing );
1157 25 : if( rv != XRIF_NOERROR )
1158 : {
1159 1 : return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_reordered error." } );
1160 : }
1161 :
1162 24 : return 0;
1163 : }
1164 :
1165 3 : void streamWriter::fgThreadStart( streamWriter *o )
1166 : {
1167 3 : o->fgThreadExec();
1168 3 : }
1169 :
1170 8 : void streamWriter::fgThreadExec()
1171 : {
1172 8 : m_fgThreadID = syscall( SYS_gettid );
1173 :
1174 : // Wait fpr the thread starter to finish initializing this thread.
1175 11 : while( m_fgThreadInit == true && m_shutdown == 0 )
1176 : {
1177 3 : sleep( 1 );
1178 : }
1179 :
1180 : timespec missing_ts;
1181 :
1182 : IMAGE image;
1183 8 : ino_t inode = 0; // The inode of the image stream file
1184 :
1185 8 : bool opened = false;
1186 :
1187 14 : while( m_shutdown == 0 )
1188 : {
1189 : /* Initialize ImageStreamIO
1190 : */
1191 6 : opened = false;
1192 6 : m_restart = false; // Set this up front, since we're about to restart.
1193 :
1194 6 : sem_t *sem{ nullptr }; ///< The semaphore to monitor for new image data
1195 :
1196 6 : int logged = 0;
1197 12 : while( !opened && !m_shutdown && !m_restart )
1198 : {
1199 : // b/c ImageStreamIO prints every single time, and latest version don't support stopping it yet,
1200 : // and that isn't thread-safe-able anyway
1201 : // we do our own checks. This is the same code in ImageStreamIO_openIm...
1202 : int SM_fd;
1203 : char SM_fname[200];
1204 6 : ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), m_shmimName.c_str() );
1205 6 : SM_fd = open( SM_fname, O_RDWR );
1206 6 : if( SM_fd == -1 )
1207 : {
1208 0 : if( !logged )
1209 : {
1210 0 : log<text_log>( "ImageStream " + m_shmimName + " not found (yet). Retrying . . .",
1211 : logPrio::LOG_NOTICE );
1212 : }
1213 0 : logged = 1;
1214 0 : sleep( 1 ); // be patient
1215 0 : continue;
1216 : }
1217 :
1218 : // Found and opened, close it and then use ImageStreamIO
1219 6 : logged = 0;
1220 6 : close( SM_fd );
1221 :
1222 6 : if( ImageStreamIO_openIm( &image, m_shmimName.c_str() ) == 0 )
1223 : {
1224 6 : if( image.md[0].sem < SEMAPHORE_MAXVAL )
1225 : {
1226 0 : ImageStreamIO_closeIm( &image );
1227 0 : mx::sys::sleep( 1 ); // We just need to wait for the server process to finish startup.
1228 : }
1229 : else
1230 : {
1231 6 : opened = true;
1232 :
1233 : char SM_fname[200];
1234 6 : ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), m_shmimName.c_str() );
1235 :
1236 : struct stat buffer;
1237 6 : int rv = stat( SM_fname, &buffer );
1238 :
1239 6 : if( rv != 0 )
1240 : {
1241 0 : log<software_critical>( { __FILE__,
1242 : __LINE__,
1243 0 : errno,
1244 0 : "Could not get inode for " + m_shmimName +
1245 : ". Source process will need to be restarted." } );
1246 0 : ImageStreamIO_closeIm( &image );
1247 0 : return;
1248 : }
1249 6 : inode = buffer.st_ino;
1250 : }
1251 : }
1252 : else
1253 : {
1254 0 : mx::sys::sleep( 1 ); // be patient
1255 : }
1256 : }
1257 :
1258 6 : if( m_restart )
1259 0 : continue; // this is kinda dumb. we just go around on restart, so why test in the while loop at all?
1260 :
1261 6 : if( m_shutdown || !opened )
1262 : {
1263 0 : if( !opened )
1264 0 : return;
1265 :
1266 0 : ImageStreamIO_closeIm( &image );
1267 0 : return;
1268 : }
1269 :
1270 : // now get a good semaphore
1271 6 : m_semaphoreNumber =
1272 6 : ImageStreamIO_getsemwaitindex( &image, m_semaphoreNumber ); // ask for semaphore we had before
1273 :
1274 6 : if( m_semaphoreNumber < 0 )
1275 : {
1276 0 : log<software_critical>(
1277 : { __FILE__,
1278 : __LINE__,
1279 0 : "No valid semaphore found for " + m_shmimName + ". Source process will need to be restarted." } );
1280 0 : return;
1281 : }
1282 :
1283 6 : log<software_info>( { __FILE__,
1284 : __LINE__,
1285 12 : "got semaphore index " + std::to_string( m_semaphoreNumber ) + " for " + m_shmimName } );
1286 :
1287 6 : ImageStreamIO_semflush( &image, m_semaphoreNumber );
1288 :
1289 6 : sem = image.semptr[m_semaphoreNumber]; ///< The semaphore to monitor for new image data
1290 :
1291 6 : m_dataType = image.md[0].datatype;
1292 6 : m_typeSize = ImageStreamIO_typesize( m_dataType );
1293 6 : m_width = image.md[0].size[0];
1294 6 : m_height = image.md[0].size[1];
1295 : size_t length;
1296 6 : if( image.md[0].naxis == 3 )
1297 : {
1298 6 : length = image.md[0].size[2];
1299 : }
1300 : else
1301 : {
1302 0 : length = 1;
1303 : }
1304 6 : std::cerr << "connected" << " " << m_width << "x" << m_height << "x" << (int)m_dataType << " (" << m_typeSize
1305 6 : << ")" << std::endl;
1306 :
1307 : // Now allocate the circBuffs
1308 6 : if( allocate_circbufs() < 0 )
1309 : {
1310 0 : return; // will cause shutdown!
1311 : }
1312 :
1313 : // And allocate the xrifs
1314 6 : if( allocate_xrif() < 0 )
1315 : {
1316 0 : return; // Will cause shutdown!
1317 : }
1318 :
1319 : // Buffer and XRIF setup can take long enough for monitor streams to accumulate
1320 : // stale semaphore posts. Flush again and re-baseline counters so we do not
1321 : // mistake startup backlog for repeated identical frames.
1322 6 : ImageStreamIO_semflush( &image, m_semaphoreNumber );
1323 :
1324 : uint8_t atype;
1325 : size_t snx, sny, snz;
1326 6 : bool useFrameArrays = image.cntarray != nullptr && length > 1;
1327 6 : bool useCnt1 = length > 1;
1328 6 : bool streamIsCube = image.md[0].naxis == 3;
1329 6 : size_t frameBytes = m_width * m_height * m_typeSize;
1330 6 : uint64_t maxChunkTimeNs = static_cast<uint64_t>( m_maxChunkTime * 1e9 );
1331 :
1332 : uint64_t curr_image; // The current cnt1 index
1333 6 : m_currImage = 0;
1334 6 : m_currChunkStart = 0;
1335 6 : m_nextChunkStart = 0;
1336 :
1337 : // Initialize curr_image after the post-setup flush.
1338 6 : if( useCnt1 )
1339 : {
1340 5 : curr_image = image.md[0].cnt1;
1341 : }
1342 : else
1343 : {
1344 1 : curr_image = 0;
1345 : }
1346 :
1347 : uint64_t last_cnt0; // = ((uint64_t)-1);
1348 :
1349 : // so we can initialize last_cnt0 to avoid frame skip on startup
1350 6 : if( useFrameArrays )
1351 : {
1352 5 : last_cnt0 = image.cntarray[curr_image];
1353 : }
1354 : else
1355 : {
1356 1 : last_cnt0 = image.md[0].cnt0;
1357 : }
1358 :
1359 6 : int cnt0flag = 0;
1360 :
1361 6 : bool restartWriting = false; // flag to prevent logging on a logging restart
1362 :
1363 : // This is the main image grabbing loop.
1364 286 : while( !m_shutdown && !m_restart )
1365 : {
1366 : timespec ts;
1367 280 : XWC_SEM_WAIT_TS_RETVOID( ts, m_semWaitSec, m_semWaitNSec );
1368 :
1369 280 : if( sem_timedwait( sem, &ts ) == 0 )
1370 : {
1371 : // Drain
1372 13 : while( sem_trywait( sem ) == 0 )
1373 : {
1374 : }
1375 :
1376 13 : if( errno != EAGAIN && errno != EINTR )
1377 : {
1378 0 : if( !m_shutdown && !m_restart )
1379 : {
1380 0 : log<software_error>( { __FILE__, __LINE__, errno, "sem_trywait" } );
1381 : }
1382 0 : break;
1383 : }
1384 :
1385 13 : if( useCnt1 )
1386 : {
1387 12 : curr_image = image.md[0].cnt1;
1388 : }
1389 : else
1390 : {
1391 1 : curr_image = 0;
1392 : }
1393 :
1394 13 : atype = image.md[0].datatype;
1395 13 : snx = image.md[0].size[0];
1396 13 : sny = image.md[0].size[1];
1397 13 : if( streamIsCube )
1398 : {
1399 13 : snz = image.md[0].size[2];
1400 : }
1401 : else
1402 : {
1403 0 : snz = 1;
1404 : }
1405 :
1406 13 : if( atype != m_dataType || snx != m_width || sny != m_height || snz != length )
1407 : {
1408 : break; // exit the nearest while loop and get the new image setup.
1409 : }
1410 :
1411 13 : if( m_shutdown || m_restart )
1412 : {
1413 : break; // Check for exit signals
1414 : }
1415 :
1416 : uint64_t new_cnt0;
1417 13 : if( useFrameArrays )
1418 : {
1419 12 : new_cnt0 = image.cntarray[curr_image];
1420 : }
1421 : else
1422 : {
1423 1 : new_cnt0 = image.md[0].cnt0;
1424 : }
1425 :
1426 : // clang-format off
1427 : #ifdef SW_DEBUG
1428 : std::cerr << "new_cnt0: " << new_cnt0 << "\n";
1429 : #endif
1430 : // clang-format on
1431 :
1432 13 : if( new_cnt0 == last_cnt0 )
1433 : {
1434 1 : ++m_repeatSemaphoreCount;
1435 1 : ++cnt0flag;
1436 1 : if( cnt0flag > 10 ) // Because of the drain this shouldn't happen
1437 : {
1438 0 : m_restart = true; // if we get here 10 times then something else is wrong.
1439 : }
1440 1 : continue;
1441 : }
1442 :
1443 12 : if( new_cnt0 - last_cnt0 > 1 ) //<- this is what we want to check.
1444 : {
1445 3 : m_skippedFrameCount += ( new_cnt0 - last_cnt0 - 1 );
1446 : }
1447 :
1448 12 : cnt0flag = 0;
1449 :
1450 12 : last_cnt0 = new_cnt0;
1451 :
1452 12 : char *curr_dest = m_rawImageCircBuff + m_currImage * frameBytes;
1453 12 : char *curr_src = reinterpret_cast<char *>( image.array.raw ) + curr_image * frameBytes;
1454 :
1455 12 : memcpy( curr_dest, curr_src, frameBytes );
1456 :
1457 12 : uint64_t *curr_timing = m_timingCircBuff + 5 * m_currImage;
1458 :
1459 12 : if( useFrameArrays )
1460 : {
1461 11 : curr_timing[0] = image.cntarray[curr_image];
1462 11 : curr_timing[1] = image.atimearray[curr_image].tv_sec;
1463 11 : curr_timing[2] = image.atimearray[curr_image].tv_nsec;
1464 11 : curr_timing[3] = image.writetimearray[curr_image].tv_sec;
1465 11 : curr_timing[4] = image.writetimearray[curr_image].tv_nsec;
1466 : }
1467 : else
1468 : {
1469 1 : curr_timing[0] = image.md[0].cnt0;
1470 1 : curr_timing[1] = image.md[0].atime.tv_sec;
1471 1 : curr_timing[2] = image.md[0].atime.tv_nsec;
1472 1 : curr_timing[3] = image.md[0].writetime.tv_sec;
1473 1 : curr_timing[4] = image.md[0].writetime.tv_nsec;
1474 : }
1475 :
1476 : // Check if we need to time-stamp ourselves -- for old cacao streams
1477 12 : if( curr_timing[1] == 0 )
1478 : {
1479 :
1480 1 : if( clock_gettime( CLOCK_REALTIME, &missing_ts ) < 0 )
1481 : {
1482 0 : log<software_critical>( { __FILE__, __LINE__, errno, 0, "clock_gettime" } );
1483 0 : return;
1484 : }
1485 :
1486 1 : curr_timing[1] = missing_ts.tv_sec;
1487 1 : curr_timing[2] = missing_ts.tv_nsec;
1488 : }
1489 :
1490 : // just set w-time to a-time if it's missing
1491 12 : if( curr_timing[3] == 0 )
1492 : {
1493 1 : curr_timing[3] = curr_timing[1];
1494 1 : curr_timing[4] = curr_timing[2];
1495 : }
1496 :
1497 12 : m_currImageTime = curr_timing[3] * 1000000000ULL + curr_timing[4];
1498 :
1499 12 : if( m_shutdown && m_writing == WRITING )
1500 : {
1501 0 : m_writing = STOP_WRITING;
1502 : }
1503 :
1504 12 : switch( m_writing )
1505 : {
1506 5 : case START_WRITING:
1507 :
1508 5 : m_currChunkStart = m_currImage;
1509 5 : m_nextChunkStart = ( m_currImage / m_writeChunkLength ) * m_writeChunkLength;
1510 5 : m_currChunkStartTime = m_currImageTime;
1511 :
1512 5 : if( !restartWriting ) // We only log if this is really a start
1513 : {
1514 5 : log<saving_start>( { 1, new_cnt0 } );
1515 : }
1516 : else // on a restart after a timeout we don't log
1517 : {
1518 0 : restartWriting = false;
1519 : }
1520 :
1521 5 : m_writing = WRITING;
1522 :
1523 : // fall through
1524 7 : case WRITING:
1525 7 : if( m_currImage - m_nextChunkStart == m_writeChunkLength - 1 )
1526 : {
1527 1 : m_currSaveStart = m_currChunkStart;
1528 1 : m_currSaveStop = m_nextChunkStart + m_writeChunkLength;
1529 1 : m_currSaveStopFrameNo = new_cnt0;
1530 :
1531 : #ifdef SW_DEBUG
1532 : std::cerr << __FILE__ << " " << __LINE__ << " WRITING " << m_currImage << " "
1533 : << m_nextChunkStart << " "
1534 : << ( m_currImage - m_nextChunkStart == m_writeChunkLength - 1 ) << " "
1535 : << ( m_currImageTime - m_currChunkStartTime > maxChunkTimeNs ) << " " << new_cnt0
1536 : << "\n";
1537 : #endif
1538 :
1539 : // Now tell the writer to get going
1540 1 : if( sem_post( &m_swSemaphore ) < 0 )
1541 : {
1542 0 : log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1543 0 : return;
1544 : }
1545 :
1546 1 : m_nextChunkStart = ( ( m_currImage + 1 ) / m_writeChunkLength ) * m_writeChunkLength;
1547 1 : if( m_nextChunkStart >= m_circBuffLength )
1548 : {
1549 0 : m_nextChunkStart = 0;
1550 : }
1551 :
1552 1 : m_currChunkStart = m_nextChunkStart;
1553 1 : m_currChunkStartTime = m_currImageTime;
1554 : }
1555 6 : else if( m_currImageTime - m_currChunkStartTime > maxChunkTimeNs )
1556 : {
1557 0 : m_currSaveStart = m_currChunkStart;
1558 0 : m_currSaveStop = m_currImage + 1;
1559 0 : m_currSaveStopFrameNo = new_cnt0;
1560 :
1561 : // clang-format off
1562 : #ifdef SW_DEBUG
1563 : std::cerr << __FILE__ << " " << __LINE__ << " IMAGE TIME WRITING " << m_currImage << " "
1564 : << m_nextChunkStart << " "
1565 : << ( m_currImage - m_nextChunkStart == m_writeChunkLength - 1 ) << " "
1566 : << ( m_currImageTime - m_currChunkStartTime > maxChunkTimeNs ) << " " << new_cnt0
1567 : << "\n";
1568 : #endif
1569 : // clang-format on
1570 :
1571 : // Now tell the writer to get going
1572 0 : if( sem_post( &m_swSemaphore ) < 0 )
1573 : {
1574 0 : log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1575 0 : return;
1576 : }
1577 :
1578 0 : m_writing = START_WRITING;
1579 0 : restartWriting = true;
1580 : }
1581 7 : break;
1582 :
1583 1 : case STOP_WRITING:
1584 1 : m_currSaveStart = m_currChunkStart;
1585 1 : m_currSaveStop = m_currImage + 1;
1586 1 : m_currSaveStopFrameNo = new_cnt0;
1587 :
1588 : // clang-format off
1589 : #ifdef SW_DEBUG
1590 : std::cerr << __FILE__ << " " << __LINE__ << " STOP_WRITING\n";
1591 : #endif
1592 : // clang-format on
1593 :
1594 : // Now tell the writer to get going
1595 1 : if( sem_post( &m_swSemaphore ) < 0 )
1596 : {
1597 0 : log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1598 0 : return;
1599 : }
1600 1 : restartWriting = false;
1601 1 : break;
1602 :
1603 4 : default:
1604 4 : break;
1605 : }
1606 :
1607 12 : ++m_currImage;
1608 12 : if( m_currImage >= m_circBuffLength )
1609 : {
1610 0 : m_currImage = 0;
1611 : }
1612 : }
1613 : else
1614 : {
1615 : // If semaphore times-out or errors, we first cleanup any writing that needs to be done
1616 : // we can also get here if a signal interrupts the sem wait which is triggered by INDI callbacks
1617 267 : switch( m_writing )
1618 : {
1619 211 : case WRITING:
1620 : // Here, if there is at least 1 image, we check for delta-time > m_maxChunkTime
1621 : // then write
1622 418 : if( ( m_currImage - m_nextChunkStart > 0 ) &&
1623 207 : ( static_cast<uint64_t>( mx::sys::get_curr_time() * 1e9 ) - m_currChunkStartTime >
1624 : maxChunkTimeNs ) )
1625 : {
1626 1 : m_currSaveStart = m_currChunkStart;
1627 1 : m_currSaveStop = m_currImage;
1628 1 : m_currSaveStopFrameNo = last_cnt0;
1629 :
1630 : #ifdef SW_DEBUG
1631 : std::cerr << __FILE__ << " " << __LINE__ << " TIMEOUT WRITING " << " " << m_currImage << " "
1632 : << m_nextChunkStart << " " << ( m_currImage - m_nextChunkStart ) << " " << last_cnt0
1633 : << "\n";
1634 : #endif
1635 :
1636 : // Now tell the writer to get going
1637 1 : if( sem_post( &m_swSemaphore ) < 0 )
1638 : {
1639 0 : log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1640 0 : return;
1641 : }
1642 :
1643 1 : m_writing = START_WRITING;
1644 1 : restartWriting = true;
1645 : }
1646 211 : break;
1647 9 : case STOP_WRITING:
1648 : // If we timed-out while STOP_WRITING is set, we trigger a write.
1649 9 : m_currSaveStart = m_currChunkStart;
1650 9 : m_currSaveStop = m_currImage;
1651 9 : m_currSaveStopFrameNo = last_cnt0;
1652 :
1653 : #ifdef SW_DEBUG
1654 : std::cerr << __FILE__ << " " << __LINE__ << " TIMEOUT STOP_WRITING\n";
1655 : #endif
1656 :
1657 : // Now tell the writer to get going
1658 9 : if( sem_post( &m_swSemaphore ) < 0 )
1659 : {
1660 0 : log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1661 0 : return;
1662 : }
1663 9 : restartWriting = false;
1664 9 : break;
1665 47 : default:
1666 47 : break;
1667 : }
1668 :
1669 267 : if( image.md[0].sem <= 0 )
1670 : {
1671 0 : break; // Indicates that the server has cleaned up.
1672 : }
1673 :
1674 : // Check for why we timed out
1675 267 : if( errno == EINTR )
1676 : {
1677 0 : break; // This will indicate time to shutdown, loop will exit normally flags set.
1678 : }
1679 :
1680 : // ETIMEDOUT just means we should wait more.
1681 : // Otherwise, report an error.
1682 267 : if( errno != ETIMEDOUT )
1683 : {
1684 0 : log<software_error>( { __FILE__, __LINE__, errno, "sem_timedwait" } );
1685 0 : break;
1686 : }
1687 :
1688 : // Check if the file has disappeared.
1689 : int SM_fd;
1690 : char SM_fname[200];
1691 267 : ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), m_shmimName.c_str() );
1692 267 : SM_fd = open( SM_fname, O_RDWR );
1693 267 : if( SM_fd == -1 )
1694 : {
1695 0 : m_restart = true;
1696 : }
1697 267 : close( SM_fd );
1698 :
1699 : // Check if the inode changed
1700 : struct stat buffer;
1701 267 : int rv = stat( SM_fname, &buffer );
1702 267 : if( rv != 0 )
1703 : {
1704 0 : m_restart = true;
1705 : }
1706 :
1707 267 : if( buffer.st_ino != inode )
1708 : {
1709 : #ifdef SW_DEBUG
1710 : std::cerr << "Restarting due to inode . . . \n";
1711 : #endif
1712 1 : m_restart = true;
1713 : }
1714 : }
1715 : }
1716 :
1717 : ///\todo might still be writing here, so must check
1718 : // If semaphore times-out or errors, we first cleanup any writing that needs to be done
1719 6 : if( m_writing == WRITING || m_writing == STOP_WRITING )
1720 : {
1721 : // Here, if there is at least 1 image, then write
1722 1 : if( ( m_currImage - m_nextChunkStart > 0 ) )
1723 : {
1724 1 : m_currSaveStart = m_currChunkStart;
1725 1 : m_currSaveStop = m_currImage;
1726 1 : m_currSaveStopFrameNo = last_cnt0;
1727 :
1728 1 : m_writing = STOP_WRITING;
1729 :
1730 1 : std::cerr << __FILE__ << " " << __LINE__ << " WRITING ON RESTART " << last_cnt0 << "\n";
1731 : // Now tell the writer to get going
1732 1 : if( sem_post( &m_swSemaphore ) < 0 )
1733 : {
1734 0 : log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
1735 0 : return;
1736 : }
1737 : }
1738 : else
1739 : {
1740 0 : m_writing = NOT_WRITING;
1741 : }
1742 :
1743 2 : while( m_writing != NOT_WRITING )
1744 : {
1745 1 : std::cerr << __FILE__ << " " << __LINE__ << " WAITING TO FINISH WRITING " << last_cnt0 << "\n";
1746 1 : sleep( 1 );
1747 : }
1748 : }
1749 :
1750 6 : if( m_rawImageCircBuff )
1751 : {
1752 6 : free( m_rawImageCircBuff );
1753 6 : m_rawImageCircBuff = 0;
1754 : }
1755 :
1756 6 : if( m_timingCircBuff )
1757 : {
1758 6 : free( m_timingCircBuff );
1759 6 : m_timingCircBuff = 0;
1760 : }
1761 :
1762 6 : if( opened )
1763 : {
1764 6 : if( m_semaphoreNumber >= 0 )
1765 : {
1766 : ///\todo is this release necessary with closeIM?
1767 6 : image.semReadPID[m_semaphoreNumber] = 0; // release semaphore
1768 : }
1769 6 : ImageStreamIO_closeIm( &image );
1770 6 : opened = false;
1771 : }
1772 :
1773 : } // outer loop, will exit if m_shutdown==true
1774 :
1775 : // One more check
1776 8 : if( m_rawImageCircBuff )
1777 : {
1778 0 : free( m_rawImageCircBuff );
1779 0 : m_rawImageCircBuff = 0;
1780 : }
1781 :
1782 8 : if( m_timingCircBuff )
1783 : {
1784 0 : free( m_timingCircBuff );
1785 0 : m_timingCircBuff = 0;
1786 : }
1787 :
1788 8 : if( opened )
1789 : {
1790 0 : if( m_semaphoreNumber >= 0 )
1791 : {
1792 : ///\todo is this release necessary with closeIM?
1793 0 : image.semReadPID[m_semaphoreNumber] = 0; // release semaphore.
1794 : }
1795 :
1796 0 : ImageStreamIO_closeIm( &image );
1797 : }
1798 : }
1799 :
1800 3 : void streamWriter::swThreadStart( streamWriter *s )
1801 : {
1802 3 : s->swThreadExec();
1803 3 : }
1804 :
1805 3 : void streamWriter::swThreadExec()
1806 : {
1807 3 : m_swThreadID = syscall( SYS_gettid );
1808 :
1809 : // Wait fpr the thread starter to finish initializing this thread.
1810 6 : while( m_swThreadInit == true && m_shutdown == 0 )
1811 : {
1812 3 : sleep( 1 );
1813 : }
1814 :
1815 3 : while( !m_shutdown )
1816 : {
1817 0 : while( !shutdown() && ( !( state() == stateCodes::READY || state() == stateCodes::OPERATING ) ) )
1818 : {
1819 0 : sleep( 1 );
1820 : }
1821 :
1822 0 : if( shutdown() )
1823 : {
1824 0 : break;
1825 : }
1826 :
1827 : timespec ts;
1828 :
1829 0 : if( clock_gettime( CLOCK_REALTIME, &ts ) < 0 )
1830 : {
1831 0 : log<software_critical>( { __FILE__, __LINE__, errno, 0, "clock_gettime" } );
1832 0 : return; // will trigger a shutdown
1833 : }
1834 :
1835 0 : mx::sys::timespecAddNsec( ts, m_semWaitNSec );
1836 :
1837 0 : if( sem_timedwait( &m_swSemaphore, &ts ) == 0 )
1838 : {
1839 0 : if( doEncode() < 0 )
1840 : {
1841 0 : log<software_critical>( { __FILE__, __LINE__, "error encoding data" } );
1842 0 : return;
1843 : }
1844 : // Otherwise, success, and we just go on.
1845 : }
1846 : else
1847 : {
1848 : // Check for why we timed out
1849 0 : if( errno == EINTR )
1850 : {
1851 0 : continue; // This will probably indicate time to shutdown, loop will exit normally if flags set.
1852 : }
1853 :
1854 : // ETIMEDOUT just means we should wait more.
1855 : // Otherwise, report an error.
1856 0 : if( errno != ETIMEDOUT )
1857 : {
1858 0 : log<software_error>( { __FILE__, __LINE__, errno, "sem_timedwait" } );
1859 0 : break;
1860 : }
1861 : }
1862 : } // outer loop, will exit if m_shutdown==true
1863 : }
1864 :
1865 20 : int streamWriter::doEncode()
1866 : {
1867 20 : if( m_writing == NOT_WRITING )
1868 : {
1869 1 : return 0;
1870 : }
1871 :
1872 19 : recordSavingState( true );
1873 :
1874 : // Record these to prevent a change in other thread
1875 19 : uint64_t saveStart = m_currSaveStart;
1876 19 : uint64_t saveStopFrameNo = m_currSaveStopFrameNo;
1877 19 : size_t nFrames = m_currSaveStop - saveStart;
1878 19 : size_t nBytes = m_width * m_height * m_typeSize;
1879 :
1880 : #ifdef SW_DEBUG
1881 : std::cerr << "nFrames: " << nFrames << "\n";
1882 : #endif
1883 :
1884 19 : if( nFrames == 0 ) // can happend during a stop. just clean up but don't try to write nothting.
1885 : {
1886 : #ifdef SW_DEBUG
1887 : std::cerr << "nothing to write\n";
1888 : #endif
1889 :
1890 1 : recordSavingStats( true );
1891 :
1892 1 : if( m_writing == STOP_WRITING )
1893 : {
1894 1 : m_writing = NOT_WRITING;
1895 1 : log<saving_stop>( { 0, saveStopFrameNo } );
1896 : }
1897 :
1898 1 : recordSavingState( true );
1899 :
1900 1 : return 0;
1901 : }
1902 : // Configure xrif and copy image data -- this does no allocations
1903 18 : int rv = xrif_set_size( m_xrif, m_width, m_height, 1, nFrames, m_dataType );
1904 18 : if( rv != XRIF_NOERROR )
1905 : {
1906 : // This is a big problem. Report it as "ALERT" and go on.
1907 0 : log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif set size error. DATA POSSIBLY LOST" } );
1908 : }
1909 :
1910 18 : rv = xrif_set_lz4_acceleration( m_xrif, m_lz4accel );
1911 18 : if( rv != XRIF_NOERROR )
1912 : {
1913 : // This may just be out of range, it's only an error.
1914 0 : log<software_error>( { __FILE__, __LINE__, 0, rv, "xrif set LZ4 acceleration error." } );
1915 : }
1916 :
1917 18 : memcpy( m_xrif->raw_buffer, m_rawImageCircBuff + saveStart * nBytes, nFrames * nBytes );
1918 :
1919 : // Configure xrif and copy timing data -- no allocations
1920 18 : rv = xrif_set_size( m_xrif_timing, 5, 1, 1, nFrames, XRIF_TYPECODE_UINT64 );
1921 18 : if( rv != XRIF_NOERROR )
1922 : {
1923 : // This is a big problem. Report it as "ALERT" and go on.
1924 0 : log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif set size error. DATA POSSIBLY LOST." } );
1925 : }
1926 :
1927 18 : rv = xrif_set_lz4_acceleration( m_xrif_timing, m_lz4accel );
1928 18 : if( rv != XRIF_NOERROR )
1929 : {
1930 : // This may just be out of range, it's only an error.
1931 1 : log<software_error>( { __FILE__, __LINE__, 0, rv, "xrif set LZ4 acceleration error." } );
1932 : }
1933 :
1934 : #ifdef SW_DEBUG
1935 : for( size_t nF = 0; nF < nFrames; ++nF )
1936 : {
1937 : std::cerr << " " << ( m_timingCircBuff + saveStart * 5 + nF * 5 )[0] << "\n";
1938 : }
1939 : #endif
1940 :
1941 18 : memcpy( m_xrif_timing->raw_buffer, m_timingCircBuff + saveStart * 5, nFrames * 5 * sizeof( uint64_t ) );
1942 :
1943 18 : rv = xrif_encode( m_xrif );
1944 18 : if( rv != XRIF_NOERROR )
1945 : {
1946 : // This is a big problem. Report it as "ALERT" and go on.
1947 0 : log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif encode error. DATA POSSIBLY LOST." } );
1948 : }
1949 :
1950 18 : rv = xrif_write_header( m_xrif_header, m_xrif );
1951 18 : if( rv != XRIF_NOERROR )
1952 : {
1953 : // This is a big problem. Report it as "ALERT" and go on.
1954 0 : log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif write header error. DATA POSSIBLY LOST." } );
1955 : }
1956 :
1957 18 : rv = xrif_encode( m_xrif_timing );
1958 18 : if( rv != XRIF_NOERROR )
1959 : {
1960 : // This is a big problem. Report it as "ALERT" and go on.
1961 1 : log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif encode error. DATA POSSIBLY LOST." } );
1962 : }
1963 :
1964 18 : rv = xrif_write_header( m_xrif_timing_header, m_xrif_timing );
1965 18 : if( rv != XRIF_NOERROR )
1966 : {
1967 : // This is a big problem. Report it as "ALERT" and go on.
1968 1 : log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif write header error. DATA POSSIBLY LOST" } );
1969 : }
1970 :
1971 : // Now break down the acq time of the first image in the buffer for use in file name
1972 : // tm uttime; // The broken down time.
1973 18 : timespec *fts = reinterpret_cast<timespec *>( m_timingCircBuff + saveStart * 5 + 1 );
1974 :
1975 18 : std::string fileName;
1976 18 : std::string relPath;
1977 36 : mx::error_t errc = file::fileTimeRelPath( fileName, relPath, m_outName, "xrif", fts->tv_sec, fts->tv_nsec );
1978 18 : if( errc != mx::error_t::noerror )
1979 : {
1980 1 : std::string msg = "error from file::fileTimeRePath: ";
1981 1 : msg += mx::errorMessage( errc );
1982 1 : msg += " (" + std::string( mx::errorName( errc ) ) + ")";
1983 1 : return log<software_error, -1>( { __FILE__, __LINE__, msg } );
1984 1 : }
1985 :
1986 17 : std::string fullPath = m_rawimageDir + '/' + relPath;
1987 :
1988 : try
1989 : {
1990 18 : std::filesystem::create_directories( fullPath ); // this does nothing if fname already exists
1991 : }
1992 1 : catch( const std::filesystem::filesystem_error &e )
1993 : {
1994 1 : std::string msg = "filesystem_error from std::create_directories. ";
1995 1 : msg += e.what();
1996 1 : msg += " code: ";
1997 1 : msg += e.code().value();
1998 1 : return log<software_critical, -1>( { __FILE__, __LINE__, msg } );
1999 1 : }
2000 0 : catch( const std::exception &e )
2001 : {
2002 0 : std::string msg = "exception from std::create_directories. ";
2003 0 : msg += e.what();
2004 0 : return log<software_critical, -1>( { __FILE__, __LINE__, msg } );
2005 0 : }
2006 :
2007 16 : m_outFilePath = fullPath + '/' + fileName;
2008 16 : FILE *fp_xrif = fopen( m_outFilePath.c_str(), "wb" );
2009 16 : if( fp_xrif == NULL )
2010 : {
2011 : // This is it. If we can't write data to disk need to fix.
2012 1 : return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "failed to open file for writing" } );
2013 : }
2014 :
2015 15 : size_t bw = fwrite( m_xrif_header, sizeof( uint8_t ), XRIF_HEADER_SIZE, fp_xrif );
2016 :
2017 15 : if( bw != XRIF_HEADER_SIZE )
2018 : {
2019 0 : log<software_alert>( { __FILE__,
2020 : __LINE__,
2021 0 : errno,
2022 : 0,
2023 0 : "failure writing header to file. DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
2024 : // We go on . . .
2025 : }
2026 :
2027 15 : bw = fwrite( m_xrif->raw_buffer, sizeof( uint8_t ), m_xrif->compressed_size, fp_xrif );
2028 :
2029 15 : if( bw != m_xrif->compressed_size )
2030 : {
2031 1 : log<software_alert>( { __FILE__,
2032 : __LINE__,
2033 1 : errno,
2034 : 0,
2035 2 : "failure writing data to file. DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
2036 : }
2037 :
2038 15 : bw = fwrite( m_xrif_timing_header, sizeof( uint8_t ), XRIF_HEADER_SIZE, fp_xrif );
2039 :
2040 15 : if( bw != XRIF_HEADER_SIZE )
2041 : {
2042 0 : log<software_alert>(
2043 : { __FILE__,
2044 : __LINE__,
2045 0 : errno,
2046 : 0,
2047 0 : "failure writing timing header to file. DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
2048 : }
2049 :
2050 15 : bw = fwrite( m_xrif_timing->raw_buffer, sizeof( uint8_t ), m_xrif_timing->compressed_size, fp_xrif );
2051 :
2052 15 : if( bw != m_xrif_timing->compressed_size )
2053 : {
2054 1 : log<software_alert>(
2055 : { __FILE__,
2056 : __LINE__,
2057 1 : errno,
2058 : 0,
2059 2 : "failure writing timing data to file. DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
2060 : }
2061 :
2062 15 : fclose( fp_xrif );
2063 :
2064 15 : recordSavingStats( true );
2065 :
2066 15 : if( m_writing == STOP_WRITING )
2067 : {
2068 1 : m_writing = NOT_WRITING;
2069 1 : log<saving_stop>( { 0, saveStopFrameNo } );
2070 : }
2071 :
2072 15 : recordSavingState( true );
2073 :
2074 15 : return 0;
2075 :
2076 18 : } // doEncode
2077 :
2078 11 : INDI_NEWCALLBACK_DEFN( streamWriter, m_indiP_writing )
2079 : ( const pcf::IndiProperty &ipRecv )
2080 : {
2081 11 : INDI_VALIDATE_CALLBACK_PROPS( m_indiP_writing, ipRecv );
2082 :
2083 18 : if( !ipRecv.find( "toggle" ) )
2084 : {
2085 1 : return 0;
2086 : }
2087 :
2088 28 : if( ipRecv["toggle"].getSwitchState() == pcf::IndiElement::Off &&
2089 4 : ( m_writing == WRITING || m_writing == START_WRITING ) )
2090 : {
2091 3 : m_writing = STOP_WRITING;
2092 : }
2093 :
2094 24 : if( ipRecv["toggle"].getSwitchState() == pcf::IndiElement::On && m_writing == NOT_WRITING )
2095 : {
2096 2 : m_writing = START_WRITING;
2097 : }
2098 :
2099 8 : return 0;
2100 : }
2101 :
2102 10 : void streamWriter::updateINDI()
2103 : {
2104 : // Only update this if not changing
2105 10 : if( m_writing == NOT_WRITING || m_writing == WRITING )
2106 : {
2107 10 : if( m_xrif && m_writing == WRITING )
2108 : {
2109 4 : indi::updateSwitchIfChanged( m_indiP_writing, "toggle", pcf::IndiElement::On, m_indiDriver, INDI_OK );
2110 4 : indi::updateIfChanged( m_indiP_xrifStats, "ratio", m_xrif->compression_ratio, m_indiDriver, INDI_BUSY );
2111 4 : indi::updateIfChanged(
2112 4 : m_indiP_xrifStats, "encodeMBsec", m_xrif->encode_rate / 1048576.0, m_indiDriver, INDI_BUSY );
2113 2 : indi::updateIfChanged( m_indiP_xrifStats,
2114 : "encodeFPS",
2115 2 : m_xrif->encode_rate / ( m_width * m_height * m_typeSize ),
2116 : m_indiDriver,
2117 : INDI_BUSY );
2118 4 : indi::updateIfChanged(
2119 4 : m_indiP_xrifStats, "differenceMBsec", m_xrif->difference_rate / 1048576.0, m_indiDriver, INDI_BUSY );
2120 2 : indi::updateIfChanged( m_indiP_xrifStats,
2121 : "differenceFPS",
2122 2 : m_xrif->difference_rate / ( m_width * m_height * m_typeSize ),
2123 : m_indiDriver,
2124 : INDI_BUSY );
2125 4 : indi::updateIfChanged(
2126 4 : m_indiP_xrifStats, "reorderMBsec", m_xrif->reorder_rate / 1048576.0, m_indiDriver, INDI_BUSY );
2127 2 : indi::updateIfChanged( m_indiP_xrifStats,
2128 : "reorderFPS",
2129 2 : m_xrif->reorder_rate / ( m_width * m_height * m_typeSize ),
2130 : m_indiDriver,
2131 : INDI_BUSY );
2132 4 : indi::updateIfChanged(
2133 4 : m_indiP_xrifStats, "compressMBsec", m_xrif->compress_rate / 1048576.0, m_indiDriver, INDI_BUSY );
2134 2 : indi::updateIfChanged( m_indiP_xrifStats,
2135 : "compressFPS",
2136 4 : m_xrif->compress_rate / ( m_width * m_height * m_typeSize ),
2137 : m_indiDriver,
2138 : INDI_BUSY );
2139 : }
2140 : else
2141 : {
2142 16 : indi::updateSwitchIfChanged( m_indiP_writing, "toggle", pcf::IndiElement::Off, m_indiDriver, INDI_OK );
2143 16 : indi::updateIfChanged( m_indiP_xrifStats, "ratio", 0.0, m_indiDriver, INDI_IDLE );
2144 16 : indi::updateIfChanged( m_indiP_xrifStats, "encodeMBsec", 0.0, m_indiDriver, INDI_IDLE );
2145 16 : indi::updateIfChanged( m_indiP_xrifStats, "encodeFPS", 0.0, m_indiDriver, INDI_IDLE );
2146 16 : indi::updateIfChanged( m_indiP_xrifStats, "differenceMBsec", 0.0, m_indiDriver, INDI_IDLE );
2147 16 : indi::updateIfChanged( m_indiP_xrifStats, "differenceFPS", 0.0, m_indiDriver, INDI_IDLE );
2148 16 : indi::updateIfChanged( m_indiP_xrifStats, "reorderMBsec", 0.0, m_indiDriver, INDI_IDLE );
2149 16 : indi::updateIfChanged( m_indiP_xrifStats, "reorderFPS", 0.0, m_indiDriver, INDI_IDLE );
2150 16 : indi::updateIfChanged( m_indiP_xrifStats, "compressMBsec", 0.0, m_indiDriver, INDI_IDLE );
2151 24 : indi::updateIfChanged( m_indiP_xrifStats, "compressFPS", 0.0, m_indiDriver, INDI_IDLE );
2152 : }
2153 : }
2154 10 : }
2155 :
2156 2 : int streamWriter::checkRecordTimes()
2157 : {
2158 2 : return telemeterT::checkRecordTimes( telem_saving_state() );
2159 : }
2160 :
2161 2 : int streamWriter::recordTelem( const telem_saving_state * )
2162 : {
2163 2 : return recordSavingState( true );
2164 : }
2165 :
2166 39 : int streamWriter::recordSavingState( bool force )
2167 : {
2168 : static int16_t lastState = -1;
2169 : static uint64_t currSaveStart = -1;
2170 :
2171 : int16_t state;
2172 39 : if( m_writing == WRITING || m_writing == START_WRITING ||
2173 5 : m_writing == STOP_WRITING ) // Changed from just writing 5/2024
2174 36 : state = 1;
2175 : else
2176 3 : state = 0;
2177 :
2178 39 : if( state != lastState || m_currSaveStart != currSaveStart || force )
2179 : {
2180 39 : telem<telem_saving_state>( { state, m_currSaveStart } );
2181 :
2182 39 : lastState = state;
2183 39 : currSaveStart = m_currSaveStart;
2184 : }
2185 :
2186 39 : return 0;
2187 : }
2188 :
2189 17 : int streamWriter::recordSavingStats( bool force )
2190 : {
2191 : static uint32_t last_rawSize = -1;
2192 : static uint32_t last_compressedSize = -1;
2193 : static float last_encodeRate = -1;
2194 : static float last_differenceRate = -1;
2195 : static float last_reorderRate = -1;
2196 : static float last_compressRate = -1;
2197 :
2198 17 : if( m_xrif->raw_size != last_rawSize || m_xrif->compressed_size != last_compressedSize ||
2199 4 : m_xrif->encode_rate != last_encodeRate || m_xrif->difference_rate != last_differenceRate ||
2200 0 : m_xrif->reorder_rate != last_reorderRate || m_xrif->compress_rate != last_compressRate || force )
2201 : {
2202 17 : telem<telem_saving>( { (uint32_t)m_xrif->raw_size,
2203 0 : (uint32_t)m_xrif->compressed_size,
2204 0 : (float)m_xrif->encode_rate,
2205 0 : (float)m_xrif->difference_rate,
2206 0 : (float)m_xrif->reorder_rate,
2207 17 : (float)m_xrif->compress_rate } );
2208 :
2209 17 : last_rawSize = m_xrif->raw_size;
2210 17 : last_compressedSize = m_xrif->compressed_size;
2211 17 : last_encodeRate = m_xrif->encode_rate;
2212 17 : last_differenceRate = m_xrif->difference_rate;
2213 17 : last_reorderRate = m_xrif->reorder_rate;
2214 17 : last_compressRate = m_xrif->compress_rate;
2215 : }
2216 :
2217 17 : return 0;
2218 : }
2219 :
2220 : } // namespace app
2221 : } // namespace MagAOX
2222 :
2223 : #endif
|