LCOV - code coverage report
Current view: top level - apps/streamWriter - streamWriter.hpp (source / functions) Coverage Total Hit
Test: MagAOX Lines: 21.6 % 758 164
Test Date: 2026-01-03 21:03:39 Functions: 38.5 % 26 10

            Line data    Source code
       1              : /** \file streamWriter.hpp
       2              :  * \brief The MagAO-X Image Stream Writer
       3              :  *
       4              :  * \author Jared R. Males (jaredmales@gmail.com)
       5              :  *
       6              :  * \ingroup streamWriter_files
       7              :  */
       8              : 
       9              : #ifndef streamWriter_hpp
      10              : #define streamWriter_hpp
      11              : 
      12              : #include <filesystem>
      13              : 
      14              : #include <ImageStreamIO/ImageStruct.h>
      15              : #include <ImageStreamIO/ImageStreamIO.h>
      16              : 
      17              : #include <xrif/xrif.h>
      18              : 
      19              : #include <mx/sys/timeUtils.hpp>
      20              : 
      21              : #include "../../libMagAOX/libMagAOX.hpp" //Note this is included on command line to trigger pch
      22              : #include "../../magaox_git_version.h"
      23              : 
      24              : #define NOT_WRITING ( 0 )
      25              : #define START_WRITING ( 1 )
      26              : #define WRITING ( 2 )
      27              : #define STOP_WRITING ( 3 )
      28              : 
      29              : // #define SW_DEBUG
      30              : 
      31              : namespace MagAOX
      32              : {
      33              : namespace app
      34              : {
      35              : 
      36              : /** \defgroup streamWriter ImageStreamIO Stream Writing
      37              :  *  \brief Writes the contents of an ImageStreamIO image stream to disk.
      38              :  *
      39              :  *  <a href="../handbook/operating/software/apps/streamWriter.html">Application Documentation</a>
      40              :  *
      41              :  *  \ingroup apps
      42              :  *
      43              :  */
      44              : 
      45              : /** \defgroup streamWriter_files ImageStreamIO Stream Writing
      46              :  * \ingroup streamWriter
      47              :  */
      48              : 
      49              : /** MagAO-X application to control writing ImageStreamIO streams to disk.
      50              :  *
      51              :  * \ingroup streamWriter
      52              :  *
      53              :  */
      54              : class streamWriter : public MagAOXApp<>, public dev::telemeter<streamWriter>
      55              : {
      56              :     typedef dev::telemeter<streamWriter> telemeterT;
      57              : 
      58              :     friend class dev::telemeter<streamWriter>;
      59              : 
      60              :     // Give the test harness access.
      61              :     friend class streamWriter_test;
      62              :     friend class streamWriter_data_test;
      63              : 
      64              :   protected:
      65              :     /** \name configurable parameters
      66              :      *@{
      67              :      */
      68              : 
      69              :     std::string m_rawimageDir; ///< The path where files will be saved.
      70              : 
      71              :     size_t m_maxCircBuffLength{ 1024 }; ///< The maximum length of the circular buffer, in frames
      72              : 
      73              :     double m_maxCircBuffSize{ 2048 }; ///< The maximum size of the circular bufffer in MB.
      74              : 
      75              :     size_t m_maxWriteChunkLength{ 512 }; /**< The maximum number of frames to write at a time.  Must
      76              :                                               be an integer factor of m_maxCircBuffLength.*/
      77              : 
      78              :     double m_maxChunkTime{ 10 }; ///< The maximum time before writing regardless of number of frames.
      79              : 
      80              :     std::string m_shmimName; ///< The name of the shared memory buffer.
      81              : 
      82              :     std::string m_outName; ///< The name to use for outputting files,  Default is m_shmimName.
      83              : 
      84              :     int m_semaphoreNumber{ 7 }; ///< The image structure semaphore index.
      85              : 
      86              :     unsigned m_semWaitSec{ 0 }; /**< The time in whole sec to wait on the semaphore,
      87              :                                      to which m_semWaitNSec is added.  Default is 0 nsec.*/
      88              : 
      89              :     unsigned m_semWaitNSec{ 500000000 }; /**< The time in nsec to wait on the semaphore, added to m_semWaitSec.
      90              :                                               Max is 999999999. Default is 5e8 nsec. */
      91              : 
      92              :     int m_lz4accel{ 1 };
      93              : 
      94              :     bool m_compress{ true };
      95              : 
      96              :     ///@}
      97              : 
      98              :     size_t m_circBuffLength{ 1024 };  ///< The length of the circular buffer, in frames
      99              :     double m_circBuffSize{ 2048.0 };  ///< The size of the circular buffer, in MB
     100              :     size_t m_writeChunkLength{ 512 }; ///< The number of frames to write at a time
     101              : 
     102              :     size_t  m_width{ 0 };    ///< The width of the image
     103              :     size_t  m_height{ 0 };   ///< The height of the image
     104              :     uint8_t m_dataType{ 0 }; ///< The ImageStreamIO type code.
     105              :     int     m_typeSize{ 0 }; ///< The pixel byte depth
     106              : 
     107              :     char     *m_rawImageCircBuff{ nullptr };
     108              :     uint64_t *m_timingCircBuff{ nullptr };
     109              : 
     110              :     size_t m_currImage{ 0 };
     111              : 
     112              :     double m_currImageTime{ 0 }; ///< The write-time of the current image
     113              : 
     114              :     double m_currChunkStartTime{ 0 }; ///< The write-time of the first image in the chunk
     115              : 
     116              :     // Writer book-keeping:
     117              :     int m_writing{ NOT_WRITING }; /**< Controls whether or not images are being written,
     118              :                    and sequences start and stop of writing.*/
     119              : 
     120              :     uint64_t m_currChunkStart{ 0 }; ///< The circular buffer starting position of the current to-be-written chunk.
     121              :     uint64_t m_nextChunkStart{ 0 }; ///< The circular buffer starting position of the next to-be-written chunk.
     122              : 
     123              :     uint64_t m_currSaveStart{ 0 }; ///< The circular buffer position at which to start saving.
     124              :     uint64_t m_currSaveStop{ 0 };  ///< The circular buffer position at which to stop saving.
     125              : 
     126              :     uint64_t m_currSaveStopFrameNo{ 0 }; ///< The frame number of the image at which saving stopped (for logging)
     127              : 
     128              :     /// The xrif compression handle for image data
     129              :     xrif_t m_xrif{ nullptr };
     130              : 
     131              :     /// Storage for the xrif image data file header
     132              :     char *m_xrif_header{ nullptr };
     133              : 
     134              :     /// The xrif compression handle for image data
     135              :     xrif_t m_xrif_timing{ nullptr };
     136              : 
     137              :     /// Storage for the xrif image data file header
     138              :     char *m_xrif_timing_header{ nullptr };
     139              : 
     140              :     std::string m_outFilePath; ///< The full path for the latest output file
     141              : 
     142              :   public:
     143              :     /// Default c'tor
     144              :     streamWriter();
     145              : 
     146              :     /// Destructor
     147              :     ~streamWriter() noexcept;
     148              : 
     149              :     /// Setup the configuration system (called by MagAOXApp::setup())
     150              :     virtual void setupConfig();
     151              : 
     152              :     /// load the configuration system results (called by MagAOXApp::setup())
     153              :     virtual void loadConfig();
     154              : 
     155              :     /// Startup functions
     156              :     /** Sets up the INDI vars.
     157              :      *
     158              :      */
     159              :     virtual int appStartup();
     160              : 
     161              :     /// Implementation of the FSM for the Siglent SDG
     162              :     virtual int appLogic();
     163              : 
     164              :     /// Do any needed shutdown tasks.  Currently nothing in this app.
     165              :     virtual int appShutdown();
     166              : 
     167              :   protected:
     168              :     /** \name SIGSEGV & SIGBUS signal handling
     169              :      * These signals occur as a result of a ImageStreamIO source server resetting (e.g. changing frame sizes).
     170              :      * When they occur a restart of the framegrabber and framewriter thread main loops is triggered.
     171              :      *
     172              :      * @{
     173              :      */
     174              :     bool m_restart{ false };
     175              : 
     176              :     static streamWriter *m_selfWriter; ///< Static pointer to this (set in constructor).  Used for getting out of the
     177              :                                        ///< static SIGSEGV handler.
     178              : 
     179              :     /// Initialize the xrif system.
     180              :     /** Allocates the handles and headers pointers.
     181              :      *
     182              :      * \returns 0 on success.
     183              :      * \returns -1 on error.
     184              :      */
     185              :     int initialize_xrif();
     186              : 
     187              :     /// Sets the handler for SIGSEGV and SIGBUS
     188              :     /** These are caused by ImageStreamIO server resets.
     189              :      */
     190              :     int setSigSegvHandler();
     191              : 
     192              :     /// The handler called when SIGSEGV or SIGBUS is received, which will be due to ImageStreamIO server resets.  Just a
     193              :     /// wrapper for handlerSigSegv.
     194              :     static void _handlerSigSegv( int signum, siginfo_t *siginf, void *ucont );
     195              : 
     196              :     /// Handles SIGSEGV and SIGBUS.  Sets m_restart to true.
     197              :     void handlerSigSegv( int signum, siginfo_t *siginf, void *ucont );
     198              :     ///@}
     199              : 
     200              :     /** \name Framegrabber Thread
     201              :      * This thread monitors the ImageStreamIO buffer and copies its images to the circular buffer.
     202              :      *
     203              :      * @{
     204              :      */
     205              :     int m_fgThreadPrio{ 1 }; ///< Priority of the framegrabber thread, should normally be > 00.
     206              : 
     207              :     std::string m_fgCpuset; ///< The cpuset for the framegrabber thread.  Ignored if empty (the default).
     208              : 
     209              :     std::thread m_fgThread; ///< A separate thread for the actual framegrabbings
     210              : 
     211              :     bool m_fgThreadInit{ true }; ///< Synchronizer to ensure f.g. thread initializes before doing dangerous things.
     212              : 
     213              :     pid_t m_fgThreadID{ 0 }; ///< F.g. thread PID.
     214              : 
     215              :     pcf::IndiProperty m_fgThreadProp; ///< The property to hold the f.g. thread details.
     216              : 
     217              :   public:
     218              :     static void getCircBuffLengths( size_t  &circBuffLength,
     219              :                                     double  &circBuffSize,
     220              :                                     size_t  &writeChunkLength,
     221              :                                     size_t   maxCircBuffLength,
     222              :                                     double   maxCircBuffSize,
     223              :                                     size_t   maxWriteChunkLength,
     224              :                                     uint32_t width,
     225              :                                     uint32_t height,
     226              :                                     size_t   typeSize );
     227              : 
     228              :   protected:
     229              :     /// Worker function to allocate the circular buffers.
     230              :     /** This takes place in the fg thread after connecting to the stream.
     231              :      *
     232              :      * \returns 0 on sucess.
     233              :      * \returns -1 on error.
     234              :      */
     235              :     int allocate_circbufs();
     236              : 
     237              :     /// Worker function to configure and allocate the xrif handles.
     238              :     /** This takes place in the fg thread after connecting to the stream.
     239              :      *
     240              :      * \returns 0 on sucess.
     241              :      * \returns -1 on error.
     242              :      */
     243              :     int allocate_xrif();
     244              : 
     245              :     /// Thread starter, called by fgThreadStart on thread construction.  Calls fgThreadExec.
     246              :     static void fgThreadStart( streamWriter *s /**< [in] a pointer to an streamWriter instance (normally this) */ );
     247              : 
     248              :     /// Execute the frame grabber main loop.
     249              :     void fgThreadExec();
     250              : 
     251              :     ///@}
     252              : 
     253              :     /** \name Stream Writer Thread
     254              :      * This thread writes chunks of the circular buffer to disk.
     255              :      *
     256              :      * @{
     257              :      */
     258              :     int m_swThreadPrio{ 1 }; ///< Priority of the stream writer thread, should normally be > 0, and <= m_fgThreadPrio.
     259              : 
     260              :     std::string m_swCpuset; ///< The cpuset for the framegrabber thread.  Ignored if empty (the default).
     261              : 
     262              :     sem_t m_swSemaphore; ///< Semaphore used to synchronize the fg thread and the sw thread.
     263              : 
     264              :     std::thread m_swThread; ///< A separate thread for the actual writing
     265              : 
     266              :     bool m_swThreadInit{ true }; ///< Synchronizer to ensure s.w. thread initializes before doing dangerous things.
     267              : 
     268              :     pid_t m_swThreadID{ 0 }; ///< S.w. thread pid.
     269              : 
     270              :     pcf::IndiProperty m_swThreadProp; ///< The property to hold the s.w. thread details.
     271              : 
     272              :     /// Thread starter, called by swThreadStart on thread construction.  Calls swThreadExec.
     273              :     static void swThreadStart( streamWriter *s /**< [in] a pointer to an streamWriter instance (normally this) */ );
     274              : 
     275              :     /// Execute the stream writer main loop.
     276              :     void swThreadExec();
     277              : 
     278              :     /// Function called when semaphore is raised to do the encode and write.
     279              :     int doEncode();
     280              :     ///@}
     281              : 
     282              :     // INDI:
     283              :   protected:
     284              :     // declare our properties
     285              :     pcf::IndiProperty m_indiP_writing;
     286              : 
     287              :     pcf::IndiProperty m_indiP_xrifStats;
     288              : 
     289              :   public:
     290            0 :     INDI_NEWCALLBACK_DECL( streamWriter, m_indiP_writing );
     291              : 
     292              :     void updateINDI();
     293              : 
     294              :     /** \name Telemeter Interface
     295              :      *
     296              :      * @{
     297              :      */
     298              :     int checkRecordTimes();
     299              : 
     300              :     int recordTelem( const telem_saving_state * );
     301              : 
     302              :     int recordSavingState( bool force = false );
     303              :     int recordSavingStats( bool force = false );
     304              : 
     305              :     ///@}
     306              : };
     307              : 
     308              : // Set self pointer to null so app starts up uninitialized.
     309              : streamWriter *streamWriter::m_selfWriter = nullptr;
     310              : 
     311           24 : streamWriter::streamWriter() : MagAOXApp( MAGAOX_CURRENT_SHA1, MAGAOX_REPO_MODIFIED )
     312              : {
     313            8 :     m_powerMgtEnabled = false;
     314              : 
     315            8 :     m_selfWriter = this;
     316              : 
     317            8 :     return;
     318            0 : }
     319              : 
     320            8 : streamWriter::~streamWriter() noexcept
     321              : {
     322            8 :     if( m_xrif )
     323            4 :         xrif_delete( m_xrif );
     324              : 
     325            8 :     if( m_xrif_header )
     326            4 :         free( m_xrif_header );
     327              : 
     328            8 :     if( m_xrif_timing )
     329            4 :         xrif_delete( m_xrif_timing );
     330              : 
     331            8 :     if( m_xrif_timing_header )
     332            4 :         free( m_xrif_timing_header );
     333              : 
     334            8 :     return;
     335            8 : }
     336              : 
     337            0 : void streamWriter::setupConfig()
     338              : {
     339            0 :     config.add( "writer.savePath",
     340              :                 "",
     341              :                 "writer.savePath",
     342              :                 argType::Required,
     343              :                 "writer",
     344              :                 "savePath",
     345              :                 false,
     346              :                 "string",
     347              :                 "The absolute path where images are saved. Will use MagAO-X default if not set." );
     348              : 
     349            0 :     config.add( "writer.maxCircBuffLength",
     350              :                 "",
     351              :                 "writer.maxCircBuffLength",
     352              :                 argType::Required,
     353              :                 "writer",
     354              :                 "maxCircBuffLength",
     355              :                 false,
     356              :                 "size_t",
     357              :                 "The maximum length in frames of the circular buffer. Should be an integer multiple of and larger than "
     358              :                 "maxWriteChunkLength." );
     359              : 
     360            0 :     config.add( "writer.maxCircBuffSize",
     361              :                 "",
     362              :                 "writer.maxCircBuffSize",
     363              :                 argType::Required,
     364              :                 "writer",
     365              :                 "maxCircBuffSize",
     366              :                 false,
     367              :                 "double",
     368              :                 "The maximum size in MB of the circular buffer. Should be sized to hold at least 2 of the maximum "
     369              :                 "frame size." );
     370              : 
     371            0 :     config.add(
     372              :         "writer.maxWriteChunkLength",
     373              :         "",
     374              :         "writer.maxWriteChunkLength",
     375              :         argType::Required,
     376              :         "writer",
     377              :         "maxWriteChunkLength",
     378              :         false,
     379              :         "size_t",
     380              :         "The maximum length in frames of the chunks to write to disk. Should be smaller than maxCircBuffLength." );
     381              : 
     382            0 :     config.add( "writer.maxChunkTime",
     383              :                 "",
     384              :                 "writer.maxChunkTime",
     385              :                 argType::Required,
     386              :                 "writer",
     387              :                 "maxChunkTime",
     388              :                 false,
     389              :                 "float",
     390              :                 "The max length in seconds of the chunks to write to disk. Default is 60 sec." );
     391              : 
     392            0 :     config.add( "writer.threadPrio",
     393              :                 "",
     394              :                 "writer.threadPrio",
     395              :                 argType::Required,
     396              :                 "writer",
     397              :                 "threadPrio",
     398              :                 false,
     399              :                 "int",
     400              :                 "The real-time priority of the stream writer thread." );
     401              : 
     402            0 :     config.add( "writer.cpuset",
     403              :                 "",
     404              :                 "writer.cpuset",
     405              :                 argType::Required,
     406              :                 "writer",
     407              :                 "cpuset",
     408              :                 false,
     409              :                 "int",
     410              :                 "The cpuset for the writer thread." );
     411              : 
     412            0 :     config.add( "writer.compress",
     413              :                 "",
     414              :                 "writer.compress",
     415              :                 argType::Required,
     416              :                 "writer",
     417              :                 "compress",
     418              :                 false,
     419              :                 "bool",
     420              :                 "Flag to set whether compression is used.  Default true." );
     421              : 
     422            0 :     config.add( "writer.lz4accel",
     423              :                 "",
     424              :                 "writer.lz4accel",
     425              :                 argType::Required,
     426              :                 "writer",
     427              :                 "lz4accel",
     428              :                 false,
     429              :                 "int",
     430              :                 "The LZ4 acceleration parameter.  Larger is faster, but lower compression." );
     431              : 
     432            0 :     config.add( "writer.outName",
     433              :                 "",
     434              :                 "writer.outName",
     435              :                 argType::Required,
     436              :                 "writer",
     437              :                 "outName",
     438              :                 false,
     439              :                 "int",
     440              :                 "The name to use for output files.  Default is the shmimName." );
     441              : 
     442            0 :     config.add( "framegrabber.shmimName",
     443              :                 "",
     444              :                 "framegrabber.shmimName",
     445              :                 argType::Required,
     446              :                 "framegrabber",
     447              :                 "shmimName",
     448              :                 false,
     449              :                 "int",
     450              :                 "The name of the stream to monitor. From /tmp/shmimName.im.shm." );
     451              : 
     452            0 :     config.add( "framegrabber.semaphoreNumber",
     453              :                 "",
     454              :                 "framegrabber.semaphoreNumber",
     455              :                 argType::Required,
     456              :                 "framegrabber",
     457              :                 "semaphoreNumber",
     458              :                 false,
     459              :                 "int",
     460              :                 "The semaphore to wait on. Default is 7." );
     461              : 
     462            0 :     config.add( "framegrabber.semWait",
     463              :                 "",
     464              :                 "framegrabber.semWait",
     465              :                 argType::Required,
     466              :                 "framegrabber",
     467              :                 "semWait",
     468              :                 false,
     469              :                 "int",
     470              :                 "The time in nsec to wait on the semaphore.  Max is 999999999. Default is 5e8 nsec." );
     471              : 
     472            0 :     config.add( "framegrabber.threadPrio",
     473              :                 "",
     474              :                 "framegrabber.threadPrio",
     475              :                 argType::Required,
     476              :                 "framegrabber",
     477              :                 "threadPrio",
     478              :                 false,
     479              :                 "int",
     480              :                 "The real-time priority of the framegrabber thread." );
     481              : 
     482            0 :     config.add( "framegrabber.cpuset",
     483              :                 "",
     484              :                 "framegrabber.cpuset",
     485              :                 argType::Required,
     486              :                 "framegrabber",
     487              :                 "cpuset",
     488              :                 false,
     489              :                 "string",
     490              :                 "The cpuset for the framegrabber thread." );
     491              : 
     492            0 :     telemeterT::setupConfig( config );
     493            0 : }
     494              : 
     495            0 : void streamWriter::loadConfig()
     496              : {
     497              : 
     498            0 :     config( m_maxCircBuffLength, "writer.maxCircBuffLength" );
     499            0 :     config( m_maxCircBuffSize, "writer.maxCircBuffSize" );
     500            0 :     config( m_maxWriteChunkLength, "writer.maxWriteChunkLength" );
     501            0 :     config( m_maxChunkTime, "writer.maxChunkTime" );
     502            0 :     config( m_swThreadPrio, "writer.threadPrio" );
     503            0 :     config( m_swCpuset, "writer.cpuset" );
     504            0 :     config( m_compress, "writer.compress" );
     505            0 :     config( m_lz4accel, "writer.lz4accel" );
     506            0 :     if( m_lz4accel < XRIF_LZ4_ACCEL_MIN )
     507              :     {
     508            0 :         m_lz4accel = XRIF_LZ4_ACCEL_MIN;
     509              :     }
     510            0 :     if( m_lz4accel > XRIF_LZ4_ACCEL_MAX )
     511              :     {
     512            0 :         m_lz4accel = XRIF_LZ4_ACCEL_MAX;
     513              :     }
     514              : 
     515            0 :     config( m_shmimName, "framegrabber.shmimName" );
     516              : 
     517            0 :     m_outName = m_shmimName;
     518            0 :     config( m_outName, "writer.outName" );
     519              : 
     520            0 :     config( m_semaphoreNumber, "framegrabber.semaphoreNumber" );
     521            0 :     config( m_semWaitNSec, "framegrabber.semWait" );
     522              : 
     523            0 :     config( m_fgThreadPrio, "framegrabber.threadPrio" );
     524            0 :     config( m_fgCpuset, "framegrabber.cpuset" );
     525              : 
     526              :     // Set some defaults
     527              :     // Setup default saving path
     528            0 :     std::string tmpstr = mx::sys::getEnv( MAGAOX_env_rawimage );
     529            0 :     if( tmpstr == "" )
     530              :     {
     531            0 :         tmpstr = MAGAOX_rawimageRelPath;
     532              :     }
     533            0 :     m_rawimageDir = basePath() + "/" + tmpstr ;
     534              : 
     535            0 :     config( m_rawimageDir, "writer.savePath" );
     536              : 
     537            0 :     if( telemeterT::loadConfig( config ) < 0 )
     538              :     {
     539            0 :         log<text_log>( "Error during telemeter config", logPrio::LOG_CRITICAL );
     540            0 :         m_shutdown = true;
     541              :     }
     542            0 : }
     543              : 
     544            0 : int streamWriter::appStartup()
     545              : {
     546              :     // Create save directory.
     547            0 :     errno = 0;
     548            0 :     if( mkdir( m_rawimageDir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH ) < 0 )
     549              :     {
     550            0 :         if( errno != EEXIST )
     551              :         {
     552            0 :             std::stringstream logss;
     553            0 :             logss << "Failed to create image directory (" << m_rawimageDir << ").  Errno says: " << strerror( errno );
     554            0 :             log<software_critical>( { __FILE__, __LINE__, errno, 0, logss.str() } );
     555              : 
     556            0 :             return -1;
     557            0 :         }
     558              :     }
     559              : 
     560              :     // set up the  INDI properties
     561            0 :     createStandardIndiToggleSw( m_indiP_writing, "writing" );
     562            0 :     registerIndiPropertyNew( m_indiP_writing, INDI_NEWCALLBACK( m_indiP_writing ) );
     563              : 
     564              :     // Register the stats INDI property
     565            0 :     REG_INDI_NEWPROP_NOCB( m_indiP_xrifStats, "xrif", pcf::IndiProperty::Number );
     566            0 :     m_indiP_xrifStats.setLabel( "xrif compression performance" );
     567              : 
     568            0 :     indi::addNumberElement<float>( m_indiP_xrifStats, "ratio", 0, 1.0, 0.0, "%0.2f", "Compression Ratio" );
     569              : 
     570            0 :     indi::addNumberElement<float>( m_indiP_xrifStats,
     571              :                                    "differenceMBsec",
     572              :                                    0,
     573            0 :                                    std::numeric_limits<float>::max(),
     574            0 :                                    0.0,
     575              :                                    "%0.2f",
     576              :                                    "Differencing Rate [MB/sec]" );
     577              : 
     578            0 :     indi::addNumberElement<float>( m_indiP_xrifStats,
     579              :                                    "reorderMBsec",
     580              :                                    0,
     581            0 :                                    std::numeric_limits<float>::max(),
     582            0 :                                    0.0,
     583              :                                    "%0.2f",
     584              :                                    "Reordering Rate [MB/sec]" );
     585              : 
     586            0 :     indi::addNumberElement<float>( m_indiP_xrifStats,
     587              :                                    "compressMBsec",
     588              :                                    0,
     589            0 :                                    std::numeric_limits<float>::max(),
     590            0 :                                    0.0,
     591              :                                    "%0.2f",
     592              :                                    "Compression Rate [MB/sec]" );
     593              : 
     594            0 :     indi::addNumberElement<float>( m_indiP_xrifStats,
     595              :                                    "encodeMBsec",
     596              :                                    0,
     597            0 :                                    std::numeric_limits<float>::max(),
     598            0 :                                    0.0,
     599              :                                    "%0.2f",
     600              :                                    "Total Encoding Rate [MB/sec]" );
     601              : 
     602            0 :     indi::addNumberElement<float>( m_indiP_xrifStats,
     603              :                                    "differenceFPS",
     604              :                                    0,
     605            0 :                                    std::numeric_limits<float>::max(),
     606            0 :                                    0.0,
     607              :                                    "%0.2f",
     608              :                                    "Differencing Rate [f.p.s.]" );
     609              : 
     610            0 :     indi::addNumberElement<float>( m_indiP_xrifStats,
     611              :                                    "reorderFPS",
     612              :                                    0,
     613            0 :                                    std::numeric_limits<float>::max(),
     614            0 :                                    0.0,
     615              :                                    "%0.2f",
     616              :                                    "Reordering Rate [f.p.s.]" );
     617              : 
     618            0 :     indi::addNumberElement<float>( m_indiP_xrifStats,
     619              :                                    "compressFPS",
     620              :                                    0,
     621            0 :                                    std::numeric_limits<float>::max(),
     622            0 :                                    0.0,
     623              :                                    "%0.2f",
     624              :                                    "Compression Rate [f.p.s.]" );
     625              : 
     626            0 :     indi::addNumberElement<float>( m_indiP_xrifStats,
     627              :                                    "encodeFPS",
     628              :                                    0,
     629            0 :                                    std::numeric_limits<float>::max(),
     630            0 :                                    0.0,
     631              :                                    "%0.2f",
     632              :                                    "Total Encoding Rate [f.p.s.]" );
     633              : 
     634              :     // Now set up the framegrabber and writer threads.
     635              :     //  - need SIGSEGV and SIGBUS handling for ImageStreamIO restarts
     636              :     //  - initialize the semaphore
     637              :     //  - start the threads
     638              : 
     639            0 :     if( setSigSegvHandler() < 0 )
     640            0 :         return log<software_error, -1>( { __FILE__, __LINE__ } );
     641              : 
     642            0 :     if( sem_init( &m_swSemaphore, 0, 0 ) < 0 )
     643            0 :         return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "Initializing S.W. semaphore" } );
     644              : 
     645              :     // Check if we have a safe writeChunkLengthh
     646            0 :     if( m_maxCircBuffLength % m_maxWriteChunkLength != 0 )
     647              :     {
     648            0 :         return log<software_critical, -1>(
     649            0 :             { __FILE__, __LINE__, "Write chunk length is not a divisor of circular buffer length." } );
     650              :     }
     651              : 
     652            0 :     if( initialize_xrif() < 0 )
     653              :     {
     654            0 :         log<software_critical, -1>( { __FILE__, __LINE__ } );
     655              :     }
     656              : 
     657            0 :     if( threadStart( m_fgThread,
     658            0 :                      m_fgThreadInit,
     659            0 :                      m_fgThreadID,
     660            0 :                      m_fgThreadProp,
     661              :                      m_fgThreadPrio,
     662            0 :                      m_fgCpuset,
     663              :                      "framegrabber",
     664              :                      this,
     665            0 :                      fgThreadStart ) < 0 )
     666              :     {
     667            0 :         return log<software_critical, -1>( { __FILE__, __LINE__ } );
     668              :     }
     669              : 
     670            0 :     if( threadStart( m_swThread,
     671            0 :                      m_swThreadInit,
     672            0 :                      m_swThreadID,
     673            0 :                      m_swThreadProp,
     674              :                      m_swThreadPrio,
     675            0 :                      m_swCpuset,
     676              :                      "streamwriter",
     677              :                      this,
     678            0 :                      swThreadStart ) < 0 )
     679              :     {
     680            0 :         log<software_critical, -1>( { __FILE__, __LINE__ } );
     681              :     }
     682              : 
     683            0 :     if( telemeterT::appStartup() < 0 )
     684              :     {
     685            0 :         return log<software_error, -1>( { __FILE__, __LINE__ } );
     686              :     }
     687              : 
     688            0 :     return 0;
     689              : }
     690              : 
     691            0 : int streamWriter::appLogic()
     692              : {
     693              : 
     694              :     // first do a join check to see if other threads have exited.
     695              :     // these will throw if the threads are really gone
     696              :     try
     697              :     {
     698            0 :         if( pthread_tryjoin_np( m_fgThread.native_handle(), 0 ) == 0 )
     699              :         {
     700            0 :             log<software_error>( { __FILE__, __LINE__, "framegrabber thread has exited" } );
     701            0 :             return -1;
     702              :         }
     703              :     }
     704            0 :     catch( ... )
     705              :     {
     706            0 :         log<software_error>( { __FILE__, __LINE__, "framegrabber thread has exited" } );
     707            0 :         return -1;
     708            0 :     }
     709              : 
     710              :     try
     711              :     {
     712            0 :         if( pthread_tryjoin_np( m_swThread.native_handle(), 0 ) == 0 )
     713              :         {
     714            0 :             log<software_error>( { __FILE__, __LINE__, "stream thread has exited" } );
     715            0 :             return -1;
     716              :         }
     717              :     }
     718            0 :     catch( ... )
     719              :     {
     720            0 :         log<software_error>( { __FILE__, __LINE__, "streamwriter thread has exited" } );
     721            0 :         return -1;
     722            0 :     }
     723              : 
     724            0 :     switch( m_writing )
     725              :     {
     726            0 :     case NOT_WRITING:
     727            0 :         state( stateCodes::READY );
     728            0 :         break;
     729            0 :     default:
     730            0 :         state( stateCodes::OPERATING );
     731              :     }
     732              : 
     733            0 :     if( state() == stateCodes::OPERATING )
     734              :     {
     735            0 :         if( telemeterT::appLogic() < 0 )
     736              :         {
     737            0 :             log<software_error>( { __FILE__, __LINE__ } );
     738            0 :             return 0;
     739              :         }
     740              :     }
     741              : 
     742            0 :     updateINDI();
     743              : 
     744            0 :     return 0;
     745              : }
     746              : 
     747            0 : int streamWriter::appShutdown()
     748              : {
     749            0 :     m_writing = NOT_WRITING;
     750            0 :     updateINDI();
     751              : 
     752              :     try
     753              :     {
     754            0 :         if( m_fgThread.joinable() )
     755              :         {
     756            0 :             m_fgThread.join();
     757              :         }
     758              :     }
     759            0 :     catch( ... )
     760              :     {
     761            0 :     }
     762              : 
     763              :     try
     764              :     {
     765            0 :         if( m_swThread.joinable() )
     766              :         {
     767            0 :             m_swThread.join();
     768              :         }
     769              :     }
     770            0 :     catch( ... )
     771              :     {
     772            0 :     }
     773              : 
     774            0 :     if( m_xrif )
     775              :     {
     776            0 :         xrif_delete( m_xrif );
     777            0 :         m_xrif = nullptr;
     778              :     }
     779              : 
     780            0 :     if( m_xrif_timing )
     781              :     {
     782            0 :         xrif_delete( m_xrif_timing );
     783            0 :         m_xrif_timing = nullptr;
     784              :     }
     785              : 
     786            0 :     telemeterT::appShutdown();
     787              : 
     788            0 :     return 0;
     789              : }
     790              : 
     791            4 : int streamWriter::initialize_xrif()
     792              : {
     793            4 :     xrif_error_t rv = xrif_new( &m_xrif );
     794            4 :     if( rv != XRIF_NOERROR )
     795              :     {
     796            0 :         return log<software_critical, -1>(
     797            0 :             { __FILE__, __LINE__, 0, rv, "xrif handle allocation or initialization error." } );
     798              :     }
     799              : 
     800            4 :     if( m_compress )
     801              :     {
     802            4 :         rv = xrif_configure( m_xrif, XRIF_DIFFERENCE_PREVIOUS, XRIF_REORDER_BYTEPACK, XRIF_COMPRESS_LZ4 );
     803            4 :         if( rv != XRIF_NOERROR )
     804              :         {
     805            0 :             return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
     806              :         }
     807              :     }
     808              :     else
     809              :     {
     810            0 :         std::cerr << "not compressing . . . \n";
     811            0 :         rv = xrif_configure( m_xrif, XRIF_DIFFERENCE_NONE, XRIF_REORDER_NONE, XRIF_COMPRESS_NONE );
     812            0 :         if( rv != XRIF_NOERROR )
     813              :         {
     814            0 :             return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
     815              :         }
     816              :     }
     817              : 
     818            4 :     errno         = 0;
     819              : 
     820            4 :     m_xrif_header = reinterpret_cast<char *>( malloc( XRIF_HEADER_SIZE * sizeof( char ) ) );
     821            4 :     if( m_xrif_header == NULL )
     822              :     {
     823            0 :         return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "xrif header allocation failed." } );
     824              :     }
     825              : 
     826            4 :     rv = xrif_new( &m_xrif_timing );
     827            4 :     if( rv != XRIF_NOERROR )
     828              :     {
     829            0 :         return log<software_critical, -1>(
     830            0 :             { __FILE__, __LINE__, 0, rv, "xrif handle allocation or initialization error." } );
     831              :     }
     832              : 
     833              :     // m_xrif_timing->reorder_method = XRIF_REORDER_NONE;
     834            4 :     rv = xrif_configure( m_xrif_timing, XRIF_DIFFERENCE_NONE, XRIF_REORDER_NONE, XRIF_COMPRESS_NONE );
     835            4 :     if( rv != XRIF_NOERROR )
     836              :     {
     837            0 :         return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
     838              :     }
     839              : 
     840            4 :     errno                = 0;
     841              : 
     842            4 :     m_xrif_timing_header = reinterpret_cast<char *>( malloc( XRIF_HEADER_SIZE * sizeof( char ) ) );
     843            4 :     if( m_xrif_timing_header == NULL )
     844              :     {
     845            0 :         return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "xrif header allocation failed." } );
     846              :     }
     847              : 
     848            4 :     return 0;
     849              : }
     850              : 
     851            0 : int streamWriter::setSigSegvHandler()
     852              : {
     853              :     struct sigaction act;
     854              :     sigset_t         set;
     855              : 
     856            0 :     act.sa_sigaction = &streamWriter::_handlerSigSegv;
     857            0 :     act.sa_flags     = SA_SIGINFO;
     858            0 :     sigemptyset( &set );
     859            0 :     act.sa_mask = set;
     860              : 
     861            0 :     errno = 0;
     862            0 :     if( sigaction( SIGSEGV, &act, 0 ) < 0 )
     863              :     {
     864            0 :         std::string logss = "Setting handler for SIGSEGV failed. Errno says: ";
     865            0 :         logss += strerror( errno );
     866              : 
     867            0 :         log<software_error>( { __FILE__, __LINE__, errno, 0, logss } );
     868              : 
     869            0 :         return -1;
     870            0 :     }
     871              : 
     872            0 :     errno = 0;
     873            0 :     if( sigaction( SIGBUS, &act, 0 ) < 0 )
     874              :     {
     875            0 :         std::string logss = "Setting handler for SIGBUS failed. Errno says: ";
     876            0 :         logss += strerror( errno );
     877              : 
     878            0 :         log<software_error>( { __FILE__, __LINE__, errno, 0, logss } );
     879              : 
     880            0 :         return -1;
     881            0 :     }
     882              : 
     883            0 :     log<text_log>( "Installed SIGSEGV/SIGBUS signal handler.", logPrio::LOG_DEBUG );
     884              : 
     885            0 :     return 0;
     886              : }
     887              : 
     888            0 : void streamWriter::_handlerSigSegv( int signum, siginfo_t *siginf, void *ucont )
     889              : {
     890            0 :     m_selfWriter->handlerSigSegv( signum, siginf, ucont );
     891            0 : }
     892              : 
     893            0 : void streamWriter::handlerSigSegv( int signum, siginfo_t *siginf, void *ucont )
     894              : {
     895              :     static_cast<void>( signum );
     896              :     static_cast<void>( siginf );
     897              :     static_cast<void>( ucont );
     898              : 
     899            0 :     m_restart = true;
     900              : 
     901            0 :     return;
     902              : }
     903              : 
     904           12 : void streamWriter::getCircBuffLengths( size_t  &circBuffLength,
     905              :                                        double  &circBuffSize,
     906              :                                        size_t  &writeChunkLength,
     907              :                                        size_t   maxCircBuffLength,
     908              :                                        double   maxCircBuffSize,
     909              :                                        size_t   maxWriteChunkLength,
     910              :                                        uint32_t width,
     911              :                                        uint32_t height,
     912              :                                        size_t   typeSize )
     913              : {
     914              :     static constexpr double MB = 1048576.0;
     915              : 
     916           12 :     size_t isz = width * height * typeSize * maxCircBuffLength;
     917              : 
     918           12 :     if( isz <= maxCircBuffSize * MB )
     919              :     {
     920            6 :         circBuffLength   = maxCircBuffLength;
     921            6 :         circBuffSize     = isz / MB;
     922            6 :         writeChunkLength = maxWriteChunkLength;
     923              : 
     924            6 :         return;
     925              :     }
     926              : 
     927            6 :     circBuffLength = maxCircBuffSize * MB / ( width * height * typeSize );
     928              : 
     929            6 :     if( circBuffLength % 2 == 1 )
     930              :     {
     931            1 :         --circBuffLength;
     932              :     }
     933              : 
     934            6 :     circBuffSize = ( width * height * typeSize * circBuffLength ) / MB;
     935              : 
     936            6 :     writeChunkLength = ( 1.0 * maxWriteChunkLength / maxCircBuffLength ) * circBuffLength;
     937              : 
     938            6 :     if( circBuffLength == 0 )
     939              :     {
     940            2 :         return;
     941              :     }
     942              : 
     943            4 :     if( writeChunkLength == 0 )
     944              :     {
     945            0 :         writeChunkLength = 1;
     946              :     }
     947              : 
     948            4 :     while( circBuffLength % writeChunkLength != 0 )
     949              :     {
     950            0 :         --writeChunkLength;
     951              :     }
     952              : 
     953            4 :     return;
     954              : }
     955              : 
     956            4 : int streamWriter::allocate_circbufs()
     957              : {
     958              : 
     959            4 :     getCircBuffLengths( m_circBuffLength,
     960            4 :                         m_circBuffSize,
     961            4 :                         m_writeChunkLength,
     962              :                         m_maxCircBuffLength,
     963              :                         m_maxCircBuffSize,
     964              :                         m_maxWriteChunkLength,
     965            4 :                         m_width,
     966            4 :                         m_height,
     967            4 :                         m_typeSize );
     968              : 
     969            4 :     if( m_circBuffLength < 2 )
     970              :     {
     971            0 :         return log<software_critical, -1>( { __FILE__, __LINE__, "frame size too large to fit in maxCircBuffSize" } );
     972              :     }
     973              : 
     974            4 :     if( m_writeChunkLength >= m_circBuffLength )
     975              :     {
     976            0 :         return log<software_critical, -1>(
     977            0 :             { __FILE__, __LINE__, "writeChunkLength is not smaller than circBuffLength" } );
     978              :     }
     979              : 
     980            4 :     if( m_circBuffLength % m_writeChunkLength != 0 )
     981              :     {
     982            0 :         return log<software_critical, -1>(
     983            0 :             { __FILE__, __LINE__, "writeChunkLength is not an integer factor of circBuffLength" } );
     984              :     }
     985              : 
     986            4 :     std::string msg = "Set circ buff length: " + std::to_string( m_circBuffLength ) + " frames (";
     987            4 :     msg += std::to_string( m_circBuffSize ) + " MB).  Write chunk length: " + std::to_string( m_writeChunkLength );
     988            4 :     msg += " frames.";
     989              : 
     990            4 :     log<text_log>( msg, logPrio::LOG_NOTICE );
     991              : 
     992            4 :     if( m_rawImageCircBuff )
     993              :     {
     994            0 :         free( m_rawImageCircBuff );
     995              :     }
     996              : 
     997            4 :     errno              = 0;
     998            4 :     m_rawImageCircBuff = reinterpret_cast<char *>( malloc( m_width * m_height * m_typeSize * m_circBuffLength ) );
     999              : 
    1000            4 :     if( m_rawImageCircBuff == NULL )
    1001              :     {
    1002            0 :         return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "buffer allocation failure" } );
    1003              :     }
    1004              : 
    1005            4 :     if( m_timingCircBuff )
    1006              :     {
    1007            0 :         free( m_timingCircBuff );
    1008              :     }
    1009              : 
    1010            4 :     errno            = 0;
    1011            4 :     m_timingCircBuff = reinterpret_cast<uint64_t *>( malloc( 5 * sizeof( uint64_t ) * m_circBuffLength ) );
    1012            4 :     if( m_timingCircBuff == NULL )
    1013              :     {
    1014            0 :         return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "buffer allocation failure" } );
    1015              :     }
    1016              : 
    1017            4 :     return 0;
    1018            4 : }
    1019              : 
    1020            4 : int streamWriter::allocate_xrif()
    1021              : {
    1022              :     // Set up the image data xrif handle
    1023              :     xrif_error_t rv;
    1024              : 
    1025            4 :     if( m_compress )
    1026              :     {
    1027            4 :         rv = xrif_configure( m_xrif, XRIF_DIFFERENCE_PREVIOUS, XRIF_REORDER_BYTEPACK, XRIF_COMPRESS_LZ4 );
    1028            4 :         if( rv != XRIF_NOERROR )
    1029              :         {
    1030            0 :             return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
    1031              :         }
    1032              :     }
    1033              :     else
    1034              :     {
    1035            0 :         std::cerr << "not compressing . . . \n";
    1036            0 :         rv = xrif_configure( m_xrif, XRIF_DIFFERENCE_NONE, XRIF_REORDER_NONE, XRIF_COMPRESS_NONE );
    1037            0 :         if( rv != XRIF_NOERROR )
    1038              :         {
    1039            0 :             return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
    1040              :         }
    1041              :     }
    1042              : 
    1043            4 :     rv = xrif_set_size( m_xrif, m_width, m_height, 1, m_writeChunkLength, m_dataType );
    1044            4 :     if( rv != XRIF_NOERROR )
    1045              :     {
    1046            0 :         return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_set_size error." } );
    1047              :     }
    1048              : 
    1049            4 :     rv = xrif_allocate_raw( m_xrif );
    1050            4 :     if( rv != XRIF_NOERROR )
    1051              :     {
    1052            0 :         return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_raw error." } );
    1053              :     }
    1054              : 
    1055            4 :     rv = xrif_allocate_reordered( m_xrif );
    1056            4 :     if( rv != XRIF_NOERROR )
    1057              :     {
    1058            0 :         return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_reordered error." } );
    1059              :     }
    1060              : 
    1061              :     // Set up the timing data xrif handle
    1062            4 :     rv = xrif_configure( m_xrif_timing, XRIF_DIFFERENCE_NONE, XRIF_REORDER_NONE, XRIF_COMPRESS_NONE );
    1063            4 :     if( rv != XRIF_NOERROR )
    1064              :     {
    1065            0 :         return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif handle configuration error." } );
    1066              :     }
    1067              : 
    1068            4 :     rv = xrif_set_size( m_xrif_timing, 5, 1, 1, m_writeChunkLength, XRIF_TYPECODE_UINT64 );
    1069            4 :     if( rv != XRIF_NOERROR )
    1070              :     {
    1071            0 :         return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_set_size error." } );
    1072              :     }
    1073              : 
    1074            4 :     rv = xrif_allocate_raw( m_xrif_timing );
    1075            4 :     if( rv != XRIF_NOERROR )
    1076              :     {
    1077            0 :         return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_raw error." } );
    1078              :     }
    1079              : 
    1080            4 :     rv = xrif_allocate_reordered( m_xrif_timing );
    1081            4 :     if( rv != XRIF_NOERROR )
    1082              :     {
    1083            0 :         return log<software_critical, -1>( { __FILE__, __LINE__, 0, rv, "xrif_allocate_reordered error." } );
    1084              :     }
    1085              : 
    1086            4 :     return 0;
    1087              : }
    1088              : 
    1089            0 : void streamWriter::fgThreadStart( streamWriter *o )
    1090              : {
    1091            0 :     o->fgThreadExec();
    1092            0 : }
    1093              : 
    1094            0 : void streamWriter::fgThreadExec()
    1095              : {
    1096            0 :     m_fgThreadID = syscall( SYS_gettid );
    1097              : 
    1098              :     // Wait fpr the thread starter to finish initializing this thread.
    1099            0 :     while( m_fgThreadInit == true && m_shutdown == 0 )
    1100              :     {
    1101            0 :         sleep( 1 );
    1102              :     }
    1103              : 
    1104              :     timespec missing_ts;
    1105              : 
    1106              :     IMAGE image;
    1107            0 :     ino_t inode = 0; // The inode of the image stream file
    1108              : 
    1109            0 :     bool opened = false;
    1110              : 
    1111            0 :     while( m_shutdown == 0 )
    1112              :     {
    1113              :         /* Initialize ImageStreamIO
    1114              :          */
    1115            0 :         opened    = false;
    1116            0 :         m_restart = false; // Set this up front, since we're about to restart.
    1117              : 
    1118            0 :         sem_t *sem{ nullptr }; ///< The semaphore to monitor for new image data
    1119              : 
    1120            0 :         int logged = 0;
    1121            0 :         while( !opened && !m_shutdown && !m_restart )
    1122              :         {
    1123              :             // b/c ImageStreamIO prints every single time, and latest version don't support stopping it yet,
    1124              :             // and that isn't thread-safe-able anyway
    1125              :             // we do our own checks.  This is the same code in ImageStreamIO_openIm...
    1126              :             int  SM_fd;
    1127              :             char SM_fname[200];
    1128            0 :             ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), m_shmimName.c_str() );
    1129            0 :             SM_fd = open( SM_fname, O_RDWR );
    1130            0 :             if( SM_fd == -1 )
    1131              :             {
    1132            0 :                 if( !logged )
    1133              :                 {
    1134            0 :                     log<text_log>( "ImageStream " + m_shmimName + " not found (yet).  Retrying . . .",
    1135              :                                    logPrio::LOG_NOTICE );
    1136              :                 }
    1137            0 :                 logged = 1;
    1138            0 :                 sleep( 1 ); // be patient
    1139            0 :                 continue;
    1140              :             }
    1141              : 
    1142              :             // Found and opened,  close it and then use ImageStreamIO
    1143            0 :             logged = 0;
    1144            0 :             close( SM_fd );
    1145              : 
    1146            0 :             if( ImageStreamIO_openIm( &image, m_shmimName.c_str() ) == 0 )
    1147              :             {
    1148            0 :                 if( image.md[0].sem < SEMAPHORE_MAXVAL )
    1149              :                 {
    1150            0 :                     ImageStreamIO_closeIm( &image );
    1151            0 :                     mx::sys::sleep( 1 ); // We just need to wait for the server process to finish startup.
    1152              :                 }
    1153              :                 else
    1154              :                 {
    1155            0 :                     opened = true;
    1156              : 
    1157              :                     char SM_fname[200];
    1158            0 :                     ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), m_shmimName.c_str() );
    1159              : 
    1160              :                     struct stat buffer;
    1161            0 :                     int         rv = stat( SM_fname, &buffer );
    1162              : 
    1163            0 :                     if( rv != 0 )
    1164              :                     {
    1165            0 :                         log<software_critical>( { __FILE__,
    1166              :                                                   __LINE__,
    1167            0 :                                                   errno,
    1168            0 :                                                   "Could not get inode for " + m_shmimName +
    1169              :                                                       ". Source process will need to be restarted." } );
    1170            0 :                         ImageStreamIO_closeIm( &image );
    1171            0 :                         return;
    1172              :                     }
    1173            0 :                     inode = buffer.st_ino;
    1174              :                 }
    1175              :             }
    1176              :             else
    1177              :             {
    1178            0 :                 mx::sys::sleep( 1 ); // be patient
    1179              :             }
    1180              :         }
    1181              : 
    1182            0 :         if( m_restart )
    1183            0 :             continue; // this is kinda dumb.  we just go around on restart, so why test in the while loop at all?
    1184              : 
    1185            0 :         if( m_shutdown || !opened )
    1186              :         {
    1187            0 :             if( !opened )
    1188            0 :                 return;
    1189              : 
    1190            0 :             ImageStreamIO_closeIm( &image );
    1191            0 :             return;
    1192              :         }
    1193              : 
    1194              :         // now get a good semaphore
    1195            0 :         m_semaphoreNumber =
    1196            0 :             ImageStreamIO_getsemwaitindex( &image, m_semaphoreNumber ); // ask for semaphore we had before
    1197              : 
    1198            0 :         if( m_semaphoreNumber < 0 )
    1199              :         {
    1200            0 :             log<software_critical>(
    1201              :                 { __FILE__,
    1202              :                   __LINE__,
    1203            0 :                   "No valid semaphore found for " + m_shmimName + ". Source process will need to be restarted." } );
    1204            0 :             return;
    1205              :         }
    1206              : 
    1207            0 :         log<software_info>( { __FILE__,
    1208              :                               __LINE__,
    1209            0 :                               "got semaphore index " + std::to_string( m_semaphoreNumber ) + " for " + m_shmimName } );
    1210              : 
    1211            0 :         ImageStreamIO_semflush( &image, m_semaphoreNumber );
    1212              : 
    1213            0 :         sem = image.semptr[m_semaphoreNumber]; ///< The semaphore to monitor for new image data
    1214              : 
    1215            0 :         m_dataType = image.md[0].datatype;
    1216            0 :         m_typeSize = ImageStreamIO_typesize( m_dataType );
    1217            0 :         m_width    = image.md[0].size[0];
    1218            0 :         m_height   = image.md[0].size[1];
    1219              :         size_t length;
    1220            0 :         if( image.md[0].naxis == 3 )
    1221              :         {
    1222            0 :             length = image.md[0].size[2];
    1223              :         }
    1224              :         else
    1225              :         {
    1226            0 :             length = 1;
    1227              :         }
    1228              :         std::cerr << "connected"
    1229            0 :                   << " " << m_width << "x" << m_height << "x" << (int)m_dataType << " (" << m_typeSize << ")"
    1230            0 :                   << std::endl;
    1231              : 
    1232              :         // Now allocate the circBuffs
    1233            0 :         if( allocate_circbufs() < 0 )
    1234              :         {
    1235            0 :             return; // will cause shutdown!
    1236              :         }
    1237              : 
    1238              :         // And allocate the xrifs
    1239            0 :         if( allocate_xrif() < 0 )
    1240              :         {
    1241            0 :             return; // Will cause shutdown!
    1242              :         }
    1243              : 
    1244              :         uint8_t atype;
    1245              :         size_t  snx, sny, snz;
    1246              : 
    1247              :         uint64_t curr_image; // The current cnt1 index
    1248            0 :         m_currImage      = 0;
    1249            0 :         m_currChunkStart = 0;
    1250            0 :         m_nextChunkStart = 0;
    1251              : 
    1252              :         // Initialized curr_image ...
    1253            0 :         if( image.md[0].naxis > 2 )
    1254              :         {
    1255            0 :             curr_image = image.md[0].cnt1;
    1256              :         }
    1257              :         else
    1258              :         {
    1259            0 :             curr_image = 0;
    1260              :         }
    1261              : 
    1262              :         uint64_t last_cnt0; // = ((uint64_t)-1);
    1263              : 
    1264              :         // so we can initialize last_cnt0 to avoid frame skip on startup
    1265            0 :         if( image.cntarray )
    1266              :         {
    1267            0 :             last_cnt0 = image.cntarray[curr_image];
    1268              :         }
    1269              :         else
    1270              :         {
    1271            0 :             last_cnt0 = image.md[0].cnt0;
    1272              :         }
    1273              : 
    1274            0 :         int cnt0flag = 0;
    1275              : 
    1276            0 :         bool restartWriting = false; // flag to prevent logging on a logging restart
    1277              : 
    1278              :         // This is the main image grabbing loop.
    1279            0 :         while( !m_shutdown && !m_restart )
    1280              :         {
    1281              :             timespec ts;
    1282            0 :             XWC_SEM_WAIT_TS_RETVOID( ts, m_semWaitSec, m_semWaitNSec );
    1283              : 
    1284            0 :             if( sem_timedwait( sem, &ts ) == 0 )
    1285              :             {
    1286            0 :                 if( image.md[0].naxis > 2 )
    1287              :                 {
    1288            0 :                     curr_image = image.md[0].cnt1;
    1289              :                 }
    1290              :                 else
    1291              :                 {
    1292            0 :                     curr_image = 0;
    1293              :                 }
    1294              : 
    1295            0 :                 atype = image.md[0].datatype;
    1296            0 :                 snx   = image.md[0].size[0];
    1297            0 :                 sny   = image.md[0].size[1];
    1298            0 :                 if( image.md[0].naxis == 3 )
    1299              :                 {
    1300            0 :                     snz = image.md[0].size[2];
    1301              :                 }
    1302              :                 else
    1303              :                 {
    1304            0 :                     snz = 1;
    1305              :                 }
    1306              : 
    1307            0 :                 if( atype != m_dataType || snx != m_width || sny != m_height || snz != length )
    1308              :                 {
    1309              :                     break; // exit the nearest while loop and get the new image setup.
    1310              :                 }
    1311              : 
    1312            0 :                 if( m_shutdown || m_restart )
    1313              :                 {
    1314              :                     break; // Check for exit signals
    1315              :                 }
    1316              : 
    1317              :                 uint64_t new_cnt0;
    1318            0 :                 if( image.cntarray )
    1319              :                 {
    1320            0 :                     new_cnt0 = image.cntarray[curr_image];
    1321              :                 }
    1322              :                 else
    1323              :                 {
    1324            0 :                     new_cnt0 = image.md[0].cnt0;
    1325              :                 }
    1326              : 
    1327              : #ifdef SW_DEBUG
    1328              :                 std::cerr << "new_cnt0: " << new_cnt0 << "\n";
    1329              : #endif
    1330              : 
    1331              :                 ///\todo cleanup skip frame handling.
    1332            0 :                 if( new_cnt0 == last_cnt0 ) //<- this probably isn't useful really
    1333              :                 {
    1334            0 :                     log<text_log>( "semaphore raised but cnt0 has not changed -- we're probably getting behind",
    1335              :                                    logPrio::LOG_WARNING );
    1336            0 :                     ++cnt0flag;
    1337            0 :                     if( cnt0flag > 10 )
    1338              :                     {
    1339            0 :                         m_restart = true; // if we get here 10 times then something else is wrong.
    1340              :                     }
    1341            0 :                     continue;
    1342              :                 }
    1343              : 
    1344            0 :                 if( new_cnt0 - last_cnt0 > 1 ) //<- this is what we want to check.
    1345              :                 {
    1346            0 :                     log<text_log>( "cnt0 changed by more than 1. Frame skipped.", logPrio::LOG_WARNING );
    1347              :                 }
    1348              : 
    1349            0 :                 cnt0flag = 0;
    1350              : 
    1351            0 :                 last_cnt0 = new_cnt0;
    1352              : 
    1353            0 :                 char *curr_dest = m_rawImageCircBuff + m_currImage * m_width * m_height * m_typeSize;
    1354            0 :                 char *curr_src =
    1355            0 :                     reinterpret_cast<char *>( image.array.raw ) + curr_image * m_width * m_height * m_typeSize;
    1356              : 
    1357            0 :                 memcpy( curr_dest, curr_src, m_width * m_height * m_typeSize );
    1358              : 
    1359            0 :                 uint64_t *curr_timing = m_timingCircBuff + 5 * m_currImage;
    1360              : 
    1361            0 :                 if( image.cntarray )
    1362              :                 {
    1363            0 :                     curr_timing[0] = image.cntarray[curr_image];
    1364            0 :                     curr_timing[1] = image.atimearray[curr_image].tv_sec;
    1365            0 :                     curr_timing[2] = image.atimearray[curr_image].tv_nsec;
    1366            0 :                     curr_timing[3] = image.writetimearray[curr_image].tv_sec;
    1367            0 :                     curr_timing[4] = image.writetimearray[curr_image].tv_nsec;
    1368              :                 }
    1369              :                 else
    1370              :                 {
    1371            0 :                     curr_timing[0] = image.md[0].cnt0;
    1372            0 :                     curr_timing[1] = image.md[0].atime.tv_sec;
    1373            0 :                     curr_timing[2] = image.md[0].atime.tv_nsec;
    1374            0 :                     curr_timing[3] = image.md[0].writetime.tv_sec;
    1375            0 :                     curr_timing[4] = image.md[0].writetime.tv_nsec;
    1376              :                 }
    1377              : 
    1378              :                 // Check if we need to time-stamp ourselves -- for old cacao streams
    1379            0 :                 if( curr_timing[1] == 0 )
    1380              :                 {
    1381              : 
    1382            0 :                     if( clock_gettime( CLOCK_REALTIME, &missing_ts ) < 0 )
    1383              :                     {
    1384            0 :                         log<software_critical>( { __FILE__, __LINE__, errno, 0, "clock_gettime" } );
    1385            0 :                         return;
    1386              :                     }
    1387              : 
    1388            0 :                     curr_timing[1] = missing_ts.tv_sec;
    1389            0 :                     curr_timing[2] = missing_ts.tv_nsec;
    1390              :                 }
    1391              : 
    1392              :                 // just set w-time to a-time if it's missing
    1393            0 :                 if( curr_timing[3] == 0 )
    1394              :                 {
    1395            0 :                     curr_timing[3] = curr_timing[1];
    1396            0 :                     curr_timing[4] = curr_timing[2];
    1397              :                 }
    1398              : 
    1399            0 :                 m_currImageTime = 1.0 * curr_timing[3] + ( 1.0 * curr_timing[4] ) / 1e9;
    1400              : 
    1401            0 :                 if( m_shutdown && m_writing == WRITING )
    1402              :                 {
    1403            0 :                     m_writing = STOP_WRITING;
    1404              :                 }
    1405              : 
    1406            0 :                 switch( m_writing )
    1407              :                 {
    1408            0 :                 case START_WRITING:
    1409              : 
    1410            0 :                     m_currChunkStart     = m_currImage;
    1411            0 :                     m_nextChunkStart     = ( m_currImage / m_writeChunkLength ) * m_writeChunkLength;
    1412            0 :                     m_currChunkStartTime = m_currImageTime;
    1413              : 
    1414            0 :                     if( !restartWriting ) // We only log if this is really a start
    1415              :                     {
    1416            0 :                         log<saving_start>( { 1, new_cnt0 } );
    1417              :                     }
    1418              :                     else // on a restart after a timeout we don't log
    1419              :                     {
    1420            0 :                         restartWriting = false;
    1421              :                     }
    1422              : 
    1423            0 :                     m_writing = WRITING;
    1424              : 
    1425              :                     // fall through
    1426            0 :                 case WRITING:
    1427            0 :                     if( m_currImage - m_nextChunkStart == m_writeChunkLength - 1 )
    1428              :                     {
    1429            0 :                         m_currSaveStart       = m_currChunkStart;
    1430            0 :                         m_currSaveStop        = m_nextChunkStart + m_writeChunkLength;
    1431            0 :                         m_currSaveStopFrameNo = new_cnt0;
    1432              : 
    1433              : #ifdef SW_DEBUG
    1434              :                         std::cerr << __FILE__ << " " << __LINE__ << " WRITING " << m_currImage << " "
    1435              :                                   << m_nextChunkStart << " "
    1436              :                                   << ( m_currImage - m_nextChunkStart == m_writeChunkLength - 1 ) << " "
    1437              :                                   << ( m_currImageTime - m_currChunkStartTime > m_maxChunkTime ) << " " << new_cnt0
    1438              :                                   << "\n";
    1439              : #endif
    1440              : 
    1441              :                         // Now tell the writer to get going
    1442            0 :                         if( sem_post( &m_swSemaphore ) < 0 )
    1443              :                         {
    1444            0 :                             log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
    1445            0 :                             return;
    1446              :                         }
    1447              : 
    1448            0 :                         m_nextChunkStart = ( ( m_currImage + 1 ) / m_writeChunkLength ) * m_writeChunkLength;
    1449            0 :                         if( m_nextChunkStart >= m_circBuffLength )
    1450              :                         {
    1451            0 :                             m_nextChunkStart = 0;
    1452              :                         }
    1453              : 
    1454            0 :                         m_currChunkStart     = m_nextChunkStart;
    1455            0 :                         m_currChunkStartTime = m_currImageTime;
    1456              :                     }
    1457            0 :                     else if( m_currImageTime - m_currChunkStartTime > m_maxChunkTime )
    1458              :                     {
    1459            0 :                         m_currSaveStart       = m_currChunkStart;
    1460            0 :                         m_currSaveStop        = m_currImage + 1;
    1461            0 :                         m_currSaveStopFrameNo = new_cnt0;
    1462              : 
    1463              : #ifdef SW_DEBUG
    1464              :                         std::cerr << __FILE__ << " " << __LINE__ << " IMAGE TIME WRITING " << m_currImage << " "
    1465              :                                   << m_nextChunkStart << " "
    1466              :                                   << ( m_currImage - m_nextChunkStart == m_writeChunkLength - 1 ) << " "
    1467              :                                   << ( m_currImageTime - m_currChunkStartTime > m_maxChunkTime ) << " " << new_cnt0
    1468              :                                   << "\n";
    1469              : #endif
    1470              : 
    1471              :                         // Now tell the writer to get going
    1472            0 :                         if( sem_post( &m_swSemaphore ) < 0 )
    1473              :                         {
    1474            0 :                             log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
    1475            0 :                             return;
    1476              :                         }
    1477              : 
    1478            0 :                         m_writing      = START_WRITING;
    1479            0 :                         restartWriting = true;
    1480              :                     }
    1481            0 :                     break;
    1482              : 
    1483            0 :                 case STOP_WRITING:
    1484            0 :                     m_currSaveStart       = m_currChunkStart;
    1485            0 :                     m_currSaveStop        = m_currImage + 1;
    1486            0 :                     m_currSaveStopFrameNo = new_cnt0;
    1487              : 
    1488              : #ifdef SW_DEBUG
    1489              :                     std::cerr << __FILE__ << " " << __LINE__ << " STOP_WRITING\n";
    1490              : #endif
    1491              : 
    1492              :                     // Now tell the writer to get going
    1493            0 :                     if( sem_post( &m_swSemaphore ) < 0 )
    1494              :                     {
    1495            0 :                         log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
    1496            0 :                         return;
    1497              :                     }
    1498            0 :                     restartWriting = false;
    1499            0 :                     break;
    1500              : 
    1501            0 :                 default:
    1502            0 :                     break;
    1503              :                 }
    1504              : 
    1505            0 :                 ++m_currImage;
    1506            0 :                 if( m_currImage >= m_circBuffLength )
    1507              :                 {
    1508            0 :                     m_currImage = 0;
    1509              :                 }
    1510              :             }
    1511              :             else
    1512              :             {
    1513              :                 // If semaphore times-out or errors, we first cleanup any writing that needs to be done
    1514              :                 // we can also get here if a signal interrupts the sem wait which is triggered by INDI callbacks
    1515            0 :                 switch( m_writing )
    1516              :                 {
    1517            0 :                 case WRITING:
    1518              :                     // Here, if there is at least 1 image, we check for delta-time > m_maxChunkTime
    1519              :                     //  then write
    1520            0 :                     if( ( m_currImage - m_nextChunkStart > 0 ) &&
    1521            0 :                         ( mx::sys::get_curr_time() - m_currChunkStartTime > m_maxChunkTime ) )
    1522              :                     {
    1523            0 :                         m_currSaveStart       = m_currChunkStart;
    1524            0 :                         m_currSaveStop        = m_currImage;
    1525            0 :                         m_currSaveStopFrameNo = last_cnt0;
    1526              : 
    1527              : #ifdef SW_DEBUG
    1528              :                         std::cerr << __FILE__ << " " << __LINE__ << " TIMEOUT WRITING " << " " << m_currImage << " "
    1529              :                                   << m_nextChunkStart << " " << ( m_currImage - m_nextChunkStart ) << " " << last_cnt0
    1530              :                                   << "\n";
    1531              : #endif
    1532              : 
    1533              :                         // Now tell the writer to get going
    1534            0 :                         if( sem_post( &m_swSemaphore ) < 0 )
    1535              :                         {
    1536            0 :                             log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
    1537            0 :                             return;
    1538              :                         }
    1539              : 
    1540            0 :                         m_writing      = START_WRITING;
    1541            0 :                         restartWriting = true;
    1542              :                     }
    1543            0 :                     break;
    1544            0 :                 case STOP_WRITING:
    1545              :                     // If we timed-out while STOP_WRITING is set, we trigger a write.
    1546            0 :                     m_currSaveStart       = m_currChunkStart;
    1547            0 :                     m_currSaveStop        = m_currImage;
    1548            0 :                     m_currSaveStopFrameNo = last_cnt0;
    1549              : 
    1550              : #ifdef SW_DEBUG
    1551              :                     std::cerr << __FILE__ << " " << __LINE__ << " TIMEOUT STOP_WRITING\n";
    1552              : #endif
    1553              : 
    1554              :                     // Now tell the writer to get going
    1555            0 :                     if( sem_post( &m_swSemaphore ) < 0 )
    1556              :                     {
    1557            0 :                         log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
    1558            0 :                         return;
    1559              :                     }
    1560            0 :                     restartWriting = false;
    1561            0 :                     break;
    1562            0 :                 default:
    1563            0 :                     break;
    1564              :                 }
    1565              : 
    1566            0 :                 if( image.md[0].sem <= 0 )
    1567              :                 {
    1568            0 :                     break; // Indicates that the server has cleaned up.
    1569              :                 }
    1570              : 
    1571              :                 // Check for why we timed out
    1572            0 :                 if( errno == EINTR )
    1573              :                 {
    1574            0 :                     break; // This will indicate time to shutdown, loop will exit normally flags set.
    1575              :                 }
    1576              : 
    1577              :                 // ETIMEDOUT just means we should wait more.
    1578              :                 // Otherwise, report an error.
    1579            0 :                 if( errno != ETIMEDOUT )
    1580              :                 {
    1581            0 :                     log<software_error>( { __FILE__, __LINE__, errno, "sem_timedwait" } );
    1582            0 :                     break;
    1583              :                 }
    1584              : 
    1585              :                 // Check if the file has disappeared.
    1586              :                 int  SM_fd;
    1587              :                 char SM_fname[200];
    1588            0 :                 ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), m_shmimName.c_str() );
    1589            0 :                 SM_fd = open( SM_fname, O_RDWR );
    1590            0 :                 if( SM_fd == -1 )
    1591              :                 {
    1592            0 :                     m_restart = true;
    1593              :                 }
    1594            0 :                 close( SM_fd );
    1595              : 
    1596              :                 // Check if the inode changed
    1597              :                 struct stat buffer;
    1598            0 :                 int         rv = stat( SM_fname, &buffer );
    1599            0 :                 if( rv != 0 )
    1600              :                 {
    1601            0 :                     m_restart = true;
    1602              :                 }
    1603              : 
    1604            0 :                 if( buffer.st_ino != inode )
    1605              :                 {
    1606              : #ifdef SW_DEBUG
    1607              :                     std::cerr << "Restarting due to inode . . . \n";
    1608              : #endif
    1609            0 :                     m_restart = true;
    1610              :                 }
    1611              :             }
    1612              :         }
    1613              : 
    1614              :         ///\todo might still be writing here, so must check
    1615              :         // If semaphore times-out or errors, we first cleanup any writing that needs to be done
    1616            0 :         if( m_writing == WRITING || m_writing == STOP_WRITING )
    1617              :         {
    1618              :             // Here, if there is at least 1 image, then write
    1619            0 :             if( ( m_currImage - m_nextChunkStart > 0 ) )
    1620              :             {
    1621            0 :                 m_currSaveStart       = m_currChunkStart;
    1622            0 :                 m_currSaveStop        = m_currImage;
    1623            0 :                 m_currSaveStopFrameNo = last_cnt0;
    1624              : 
    1625            0 :                 m_writing = STOP_WRITING;
    1626              : 
    1627            0 :                 std::cerr << __FILE__ << " " << __LINE__ << " WRITING ON RESTART " << last_cnt0 << "\n";
    1628              :                 // Now tell the writer to get going
    1629            0 :                 if( sem_post( &m_swSemaphore ) < 0 )
    1630              :                 {
    1631            0 :                     log<software_critical>( { __FILE__, __LINE__, errno, 0, "Error posting to semaphore" } );
    1632            0 :                     return;
    1633              :                 }
    1634              :             }
    1635              :             else
    1636              :             {
    1637            0 :                 m_writing = NOT_WRITING;
    1638              :             }
    1639              : 
    1640            0 :             while( m_writing != NOT_WRITING )
    1641              :             {
    1642            0 :                 std::cerr << __FILE__ << " " << __LINE__ << " WAITING TO FINISH WRITING " << last_cnt0 << "\n";
    1643            0 :                 sleep( 1 );
    1644              :             }
    1645              :         }
    1646              : 
    1647            0 :         if( m_rawImageCircBuff )
    1648              :         {
    1649            0 :             free( m_rawImageCircBuff );
    1650            0 :             m_rawImageCircBuff = 0;
    1651              :         }
    1652              : 
    1653            0 :         if( m_timingCircBuff )
    1654              :         {
    1655            0 :             free( m_timingCircBuff );
    1656            0 :             m_timingCircBuff = 0;
    1657              :         }
    1658              : 
    1659            0 :         if( opened )
    1660              :         {
    1661            0 :             if( m_semaphoreNumber >= 0 )
    1662              :             {
    1663              :                 ///\todo is this release necessary with closeIM?
    1664            0 :                 image.semReadPID[m_semaphoreNumber] = 0; // release semaphore
    1665              :             }
    1666            0 :             ImageStreamIO_closeIm( &image );
    1667            0 :             opened = false;
    1668              :         }
    1669              : 
    1670              :     } // outer loop, will exit if m_shutdown==true
    1671              : 
    1672              :     // One more check
    1673            0 :     if( m_rawImageCircBuff )
    1674              :     {
    1675            0 :         free( m_rawImageCircBuff );
    1676            0 :         m_rawImageCircBuff = 0;
    1677              :     }
    1678              : 
    1679            0 :     if( m_timingCircBuff )
    1680              :     {
    1681            0 :         free( m_timingCircBuff );
    1682            0 :         m_timingCircBuff = 0;
    1683              :     }
    1684              : 
    1685            0 :     if( opened )
    1686              :     {
    1687            0 :         if( m_semaphoreNumber >= 0 )
    1688              :         {
    1689              :             ///\todo is this release necessary with closeIM?
    1690            0 :             image.semReadPID[m_semaphoreNumber] = 0; // release semaphore.
    1691              :         }
    1692              : 
    1693            0 :         ImageStreamIO_closeIm( &image );
    1694              :     }
    1695              : }
    1696              : 
    1697            0 : void streamWriter::swThreadStart( streamWriter *s )
    1698              : {
    1699            0 :     s->swThreadExec();
    1700            0 : }
    1701              : 
    1702            0 : void streamWriter::swThreadExec()
    1703              : {
    1704            0 :     m_swThreadID = syscall( SYS_gettid );
    1705              : 
    1706              :     // Wait fpr the thread starter to finish initializing this thread.
    1707            0 :     while( m_swThreadInit == true && m_shutdown == 0 )
    1708              :     {
    1709            0 :         sleep( 1 );
    1710              :     }
    1711              : 
    1712            0 :     while( !m_shutdown )
    1713              :     {
    1714            0 :         while( !shutdown() && ( !( state() == stateCodes::READY || state() == stateCodes::OPERATING ) ) )
    1715              :         {
    1716            0 :             sleep( 1 );
    1717              :         }
    1718              : 
    1719            0 :         if( shutdown() )
    1720              :         {
    1721            0 :             break;
    1722              :         }
    1723              : 
    1724              :         timespec ts;
    1725              : 
    1726            0 :         if( clock_gettime( CLOCK_REALTIME, &ts ) < 0 )
    1727              :         {
    1728            0 :             log<software_critical>( { __FILE__, __LINE__, errno, 0, "clock_gettime" } );
    1729            0 :             return; // will trigger a shutdown
    1730              :         }
    1731              : 
    1732            0 :         mx::sys::timespecAddNsec( ts, m_semWaitNSec );
    1733              : 
    1734            0 :         if( sem_timedwait( &m_swSemaphore, &ts ) == 0 )
    1735              :         {
    1736            0 :             if( doEncode() < 0 )
    1737              :             {
    1738            0 :                 log<software_critical>( { __FILE__, __LINE__, "error encoding data" } );
    1739            0 :                 return;
    1740              :             }
    1741              :             // Otherwise, success, and we just go on.
    1742              :         }
    1743              :         else
    1744              :         {
    1745              :             // Check for why we timed out
    1746            0 :             if( errno == EINTR )
    1747              :             {
    1748            0 :                 continue; // This will probably indicate time to shutdown, loop will exit normally if flags set.
    1749              :             }
    1750              : 
    1751              :             // ETIMEDOUT just means we should wait more.
    1752              :             // Otherwise, report an error.
    1753            0 :             if( errno != ETIMEDOUT )
    1754              :             {
    1755            0 :                 log<software_error>( { __FILE__, __LINE__, errno, "sem_timedwait" } );
    1756            0 :                 break;
    1757              :             }
    1758              :         }
    1759              :     } // outer loop, will exit if m_shutdown==true
    1760              : }
    1761              : 
    1762            4 : int streamWriter::doEncode()
    1763              : {
    1764            4 :     if( m_writing == NOT_WRITING )
    1765              :     {
    1766            0 :         return 0;
    1767              :     }
    1768              : 
    1769            4 :     recordSavingState( true );
    1770              : 
    1771              :     // Record these to prevent a change in other thread
    1772            4 :     uint64_t saveStart       = m_currSaveStart;
    1773            4 :     uint64_t saveStopFrameNo = m_currSaveStopFrameNo;
    1774            4 :     size_t   nFrames         = m_currSaveStop - saveStart;
    1775            4 :     size_t   nBytes          = m_width * m_height * m_typeSize;
    1776              : 
    1777              : #ifdef SW_DEBUG
    1778              :     std::cerr << "nFrames: " << nFrames << "\n";
    1779              : #endif
    1780              : 
    1781            4 :     if( nFrames == 0 ) // can happend during a stop.  just clean up but don't try to write nothting.
    1782              :     {
    1783              : #ifdef SW_DEBUG
    1784              :         std::cerr << "nothing to write\n";
    1785              : #endif
    1786              : 
    1787            0 :         recordSavingStats( true );
    1788              : 
    1789            0 :         if( m_writing == STOP_WRITING )
    1790              :         {
    1791            0 :             m_writing = NOT_WRITING;
    1792            0 :             log<saving_stop>( { 0, saveStopFrameNo } );
    1793              :         }
    1794              : 
    1795            0 :         recordSavingState( true );
    1796              : 
    1797            0 :         return 0;
    1798              :     }
    1799              :     // Configure xrif and copy image data -- this does no allocations
    1800            4 :     int rv = xrif_set_size( m_xrif, m_width, m_height, 1, nFrames, m_dataType );
    1801            4 :     if( rv != XRIF_NOERROR )
    1802              :     {
    1803              :         // This is a big problem.  Report it as "ALERT" and go on.
    1804            0 :         log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif set size error. DATA POSSIBLY LOST" } );
    1805              :     }
    1806              : 
    1807            4 :     rv = xrif_set_lz4_acceleration( m_xrif, m_lz4accel );
    1808            4 :     if( rv != XRIF_NOERROR )
    1809              :     {
    1810              :         // This may just be out of range, it's only an error.
    1811            0 :         log<software_error>( { __FILE__, __LINE__, 0, rv, "xrif set LZ4 acceleration error." } );
    1812              :     }
    1813              : 
    1814            4 :     memcpy( m_xrif->raw_buffer, m_rawImageCircBuff + saveStart * nBytes, nFrames * nBytes );
    1815              : 
    1816              :     // Configure xrif and copy timing data -- no allocations
    1817            4 :     rv = xrif_set_size( m_xrif_timing, 5, 1, 1, nFrames, XRIF_TYPECODE_UINT64 );
    1818            4 :     if( rv != XRIF_NOERROR )
    1819              :     {
    1820              :         // This is a big problem.  Report it as "ALERT" and go on.
    1821            0 :         log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif set size error. DATA POSSIBLY LOST." } );
    1822              :     }
    1823              : 
    1824            4 :     rv = xrif_set_lz4_acceleration( m_xrif_timing, m_lz4accel );
    1825            4 :     if( rv != XRIF_NOERROR )
    1826              :     {
    1827              :         // This may just be out of range, it's only an error.
    1828            0 :         log<software_error>( { __FILE__, __LINE__, 0, rv, "xrif set LZ4 acceleration error." } );
    1829              :     }
    1830              : 
    1831              : #ifdef SW_DEBUG
    1832              :     for( size_t nF = 0; nF < nFrames; ++nF )
    1833              :     {
    1834              :         std::cerr << "      " << ( m_timingCircBuff + saveStart * 5 + nF * 5 )[0] << "\n";
    1835              :     }
    1836              : #endif
    1837              : 
    1838            4 :     memcpy( m_xrif_timing->raw_buffer, m_timingCircBuff + saveStart * 5, nFrames * 5 * sizeof( uint64_t ) );
    1839              : 
    1840            4 :     rv = xrif_encode( m_xrif );
    1841            4 :     if( rv != XRIF_NOERROR )
    1842              :     {
    1843              :         // This is a big problem.  Report it as "ALERT" and go on.
    1844            0 :         log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif encode error. DATA POSSIBLY LOST." } );
    1845              :     }
    1846              : 
    1847            4 :     rv = xrif_write_header( m_xrif_header, m_xrif );
    1848            4 :     if( rv != XRIF_NOERROR )
    1849              :     {
    1850              :         // This is a big problem.  Report it as "ALERT" and go on.
    1851            0 :         log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif write header error. DATA POSSIBLY LOST." } );
    1852              :     }
    1853              : 
    1854            4 :     rv = xrif_encode( m_xrif_timing );
    1855            4 :     if( rv != XRIF_NOERROR )
    1856              :     {
    1857              :         // This is a big problem.  Report it as "ALERT" and go on.
    1858            0 :         log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif encode error. DATA POSSIBLY LOST." } );
    1859              :     }
    1860              : 
    1861            4 :     rv = xrif_write_header( m_xrif_timing_header, m_xrif_timing );
    1862            4 :     if( rv != XRIF_NOERROR )
    1863              :     {
    1864              :         // This is a big problem.  Report it as "ALERT" and go on.
    1865            0 :         log<software_alert>( { __FILE__, __LINE__, 0, rv, "xrif write header error. DATA POSSIBLY LOST" } );
    1866              :     }
    1867              : 
    1868              :     // Now break down the acq time of the first image in the buffer for use in file name
    1869              :     // tm        uttime; // The broken down time.
    1870            4 :     timespec *fts = reinterpret_cast<timespec *>( m_timingCircBuff + saveStart * 5 + 1 );
    1871              : 
    1872            4 :     std::string fileName;
    1873            4 :     std::string relPath;
    1874            8 :     mx::error_t errc = file::fileTimeRelPath( fileName, relPath, m_outName, "xrif", fts->tv_sec, fts->tv_nsec );
    1875            4 :     if(errc != mx::error_t::noerror)
    1876              :     {
    1877            0 :         std::string msg = "error from file::fileTimeRePath: ";
    1878            0 :         msg += mx::errorMessage(errc);
    1879            0 :         msg += " (" + std::string(mx::errorName(errc)) + ")";
    1880            0 :         return log<software_error, -1>( { __FILE__, __LINE__, msg } );
    1881            0 :     }
    1882              : 
    1883            4 :     std::string fullPath = m_rawimageDir + '/' + relPath;
    1884              : 
    1885              :     try
    1886              :     {
    1887            4 :         std::filesystem::create_directories( fullPath ); // this does nothing if fname already exists
    1888              :     }
    1889            0 :     catch( const std::filesystem::filesystem_error &e )
    1890              :     {
    1891            0 :         std::string msg = "filesystem_error from std::create_directories. ";
    1892            0 :         msg += e.what();
    1893            0 :         msg += " code: ";
    1894            0 :         msg += e.code().value();
    1895            0 :         return log<software_critical, -1>( { __FILE__, __LINE__, msg } );
    1896            0 :     }
    1897            0 :     catch( const std::exception &e )
    1898              :     {
    1899            0 :         std::string msg = "exception from std::create_directories. ";
    1900            0 :         msg += e.what();
    1901            0 :         return log<software_critical, -1>( { __FILE__, __LINE__, msg } );
    1902            0 :     }
    1903              : 
    1904            4 :     m_outFilePath = fullPath + '/' + fileName;
    1905            4 :     FILE *fp_xrif = fopen( m_outFilePath.c_str(), "wb" );
    1906            4 :     if( fp_xrif == NULL )
    1907              :     {
    1908              :         // This is it.  If we can't write data to disk need to fix.
    1909            0 :         return log<software_critical, -1>( { __FILE__, __LINE__, errno, 0, "failed to open file for writing" } );
    1910              :     }
    1911              : 
    1912            4 :     size_t bw = fwrite( m_xrif_header, sizeof( uint8_t ), XRIF_HEADER_SIZE, fp_xrif );
    1913              : 
    1914            4 :     if( bw != XRIF_HEADER_SIZE )
    1915              :     {
    1916            0 :         log<software_alert>( { __FILE__,
    1917              :                                __LINE__,
    1918            0 :                                errno,
    1919              :                                0,
    1920            0 :                                "failure writing header to file.  DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
    1921              :         // We go on . . .
    1922              :     }
    1923              : 
    1924            4 :     bw = fwrite( m_xrif->raw_buffer, sizeof( uint8_t ), m_xrif->compressed_size, fp_xrif );
    1925              : 
    1926            4 :     if( bw != m_xrif->compressed_size )
    1927              :     {
    1928            0 :         log<software_alert>( { __FILE__,
    1929              :                                __LINE__,
    1930            0 :                                errno,
    1931              :                                0,
    1932            0 :                                "failure writing data to file.  DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
    1933              :     }
    1934              : 
    1935            4 :     bw = fwrite( m_xrif_timing_header, sizeof( uint8_t ), XRIF_HEADER_SIZE, fp_xrif );
    1936              : 
    1937            4 :     if( bw != XRIF_HEADER_SIZE )
    1938              :     {
    1939            0 :         log<software_alert>(
    1940              :             { __FILE__,
    1941              :               __LINE__,
    1942            0 :               errno,
    1943              :               0,
    1944            0 :               "failure writing timing header to file.  DATA LOSS LIKELY.  bytes = " + std::to_string( bw ) } );
    1945              :     }
    1946              : 
    1947            4 :     bw = fwrite( m_xrif_timing->raw_buffer, sizeof( uint8_t ), m_xrif_timing->compressed_size, fp_xrif );
    1948              : 
    1949            4 :     if( bw != m_xrif_timing->compressed_size )
    1950              :     {
    1951            0 :         log<software_alert>(
    1952              :             { __FILE__,
    1953              :               __LINE__,
    1954            0 :               errno,
    1955              :               0,
    1956            0 :               "failure writing timing data to file. DATA LOSS LIKELY. bytes = " + std::to_string( bw ) } );
    1957              :     }
    1958              : 
    1959            4 :     fclose( fp_xrif );
    1960              : 
    1961            4 :     recordSavingStats( true );
    1962              : 
    1963            4 :     if( m_writing == STOP_WRITING )
    1964              :     {
    1965            0 :         m_writing = NOT_WRITING;
    1966            0 :         log<saving_stop>( { 0, saveStopFrameNo } );
    1967              :     }
    1968              : 
    1969            4 :     recordSavingState( true );
    1970              : 
    1971            4 :     return 0;
    1972              : 
    1973            4 : } // doEncode
    1974              : 
    1975            3 : INDI_NEWCALLBACK_DEFN( streamWriter, m_indiP_writing )
    1976              : ( const pcf::IndiProperty &ipRecv )
    1977              : {
    1978            3 :     INDI_VALIDATE_CALLBACK_PROPS( m_indiP_writing, ipRecv );
    1979              : 
    1980              :     if( !ipRecv.find( "toggle" ) )
    1981              :     {
    1982              :         return 0;
    1983              :     }
    1984              : 
    1985              :     if( ipRecv["toggle"].getSwitchState() == pcf::IndiElement::Off &&
    1986              :         ( m_writing == WRITING || m_writing == START_WRITING ) )
    1987              :     {
    1988              :         m_writing = STOP_WRITING;
    1989              :     }
    1990              : 
    1991              :     if( ipRecv["toggle"].getSwitchState() == pcf::IndiElement::On && m_writing == NOT_WRITING )
    1992              :     {
    1993              :         m_writing = START_WRITING;
    1994              :     }
    1995              : 
    1996              :     return 0;
    1997              : }
    1998              : 
    1999            0 : void streamWriter::updateINDI()
    2000              : {
    2001              :     // Only update this if not changing
    2002            0 :     if( m_writing == NOT_WRITING || m_writing == WRITING )
    2003              :     {
    2004            0 :         if( m_xrif && m_writing == WRITING )
    2005              :         {
    2006            0 :             indi::updateSwitchIfChanged( m_indiP_writing, "toggle", pcf::IndiElement::On, m_indiDriver, INDI_OK );
    2007            0 :             indi::updateIfChanged( m_indiP_xrifStats, "ratio", m_xrif->compression_ratio, m_indiDriver, INDI_BUSY );
    2008            0 :             indi::updateIfChanged(
    2009            0 :                 m_indiP_xrifStats, "encodeMBsec", m_xrif->encode_rate / 1048576.0, m_indiDriver, INDI_BUSY );
    2010            0 :             indi::updateIfChanged( m_indiP_xrifStats,
    2011              :                                    "encodeFPS",
    2012            0 :                                    m_xrif->encode_rate / ( m_width * m_height * m_typeSize ),
    2013              :                                    m_indiDriver,
    2014              :                                    INDI_BUSY );
    2015            0 :             indi::updateIfChanged(
    2016            0 :                 m_indiP_xrifStats, "differenceMBsec", m_xrif->difference_rate / 1048576.0, m_indiDriver, INDI_BUSY );
    2017            0 :             indi::updateIfChanged( m_indiP_xrifStats,
    2018              :                                    "differenceFPS",
    2019            0 :                                    m_xrif->difference_rate / ( m_width * m_height * m_typeSize ),
    2020              :                                    m_indiDriver,
    2021              :                                    INDI_BUSY );
    2022            0 :             indi::updateIfChanged(
    2023            0 :                 m_indiP_xrifStats, "reorderMBsec", m_xrif->reorder_rate / 1048576.0, m_indiDriver, INDI_BUSY );
    2024            0 :             indi::updateIfChanged( m_indiP_xrifStats,
    2025              :                                    "reorderFPS",
    2026            0 :                                    m_xrif->reorder_rate / ( m_width * m_height * m_typeSize ),
    2027              :                                    m_indiDriver,
    2028              :                                    INDI_BUSY );
    2029            0 :             indi::updateIfChanged(
    2030            0 :                 m_indiP_xrifStats, "compressMBsec", m_xrif->compress_rate / 1048576.0, m_indiDriver, INDI_BUSY );
    2031            0 :             indi::updateIfChanged( m_indiP_xrifStats,
    2032              :                                    "compressFPS",
    2033            0 :                                    m_xrif->compress_rate / ( m_width * m_height * m_typeSize ),
    2034              :                                    m_indiDriver,
    2035              :                                    INDI_BUSY );
    2036              :         }
    2037              :         else
    2038              :         {
    2039            0 :             indi::updateSwitchIfChanged( m_indiP_writing, "toggle", pcf::IndiElement::Off, m_indiDriver, INDI_OK );
    2040            0 :             indi::updateIfChanged( m_indiP_xrifStats, "ratio", 0.0, m_indiDriver, INDI_IDLE );
    2041            0 :             indi::updateIfChanged( m_indiP_xrifStats, "encodeMBsec", 0.0, m_indiDriver, INDI_IDLE );
    2042            0 :             indi::updateIfChanged( m_indiP_xrifStats, "encodeFPS", 0.0, m_indiDriver, INDI_IDLE );
    2043            0 :             indi::updateIfChanged( m_indiP_xrifStats, "differenceMBsec", 0.0, m_indiDriver, INDI_IDLE );
    2044            0 :             indi::updateIfChanged( m_indiP_xrifStats, "differenceFPS", 0.0, m_indiDriver, INDI_IDLE );
    2045            0 :             indi::updateIfChanged( m_indiP_xrifStats, "reorderMBsec", 0.0, m_indiDriver, INDI_IDLE );
    2046            0 :             indi::updateIfChanged( m_indiP_xrifStats, "reorderFPS", 0.0, m_indiDriver, INDI_IDLE );
    2047            0 :             indi::updateIfChanged( m_indiP_xrifStats, "compressMBsec", 0.0, m_indiDriver, INDI_IDLE );
    2048            0 :             indi::updateIfChanged( m_indiP_xrifStats, "compressFPS", 0.0, m_indiDriver, INDI_IDLE );
    2049              :         }
    2050              :     }
    2051            0 : }
    2052              : 
    2053            0 : int streamWriter::checkRecordTimes()
    2054              : {
    2055            0 :     return telemeterT::checkRecordTimes( telem_saving_state() );
    2056              : }
    2057              : 
    2058            0 : int streamWriter::recordTelem( const telem_saving_state * )
    2059              : {
    2060            0 :     return recordSavingState( true );
    2061              : }
    2062              : 
    2063            8 : int streamWriter::recordSavingState( bool force )
    2064              : {
    2065              :     static int16_t  lastState     = -1;
    2066              :     static uint64_t currSaveStart = -1;
    2067              : 
    2068              :     int16_t state;
    2069            8 :     if( m_writing == WRITING || m_writing == START_WRITING ||
    2070            0 :         m_writing == STOP_WRITING ) // Changed from just writing 5/2024
    2071            8 :         state = 1;
    2072              :     else
    2073            0 :         state = 0;
    2074              : 
    2075            8 :     if( state != lastState || m_currSaveStart != currSaveStart || force )
    2076              :     {
    2077            8 :         telem<telem_saving_state>( { state, m_currSaveStart } );
    2078              : 
    2079            8 :         lastState     = state;
    2080            8 :         currSaveStart = m_currSaveStart;
    2081              :     }
    2082              : 
    2083            8 :     return 0;
    2084              : }
    2085              : 
    2086            4 : int streamWriter::recordSavingStats( bool force )
    2087              : {
    2088              :     static uint32_t last_rawSize        = -1;
    2089              :     static uint32_t last_compressedSize = -1;
    2090              :     static float    last_encodeRate     = -1;
    2091              :     static float    last_differenceRate = -1;
    2092              :     static float    last_reorderRate    = -1;
    2093              :     static float    last_compressRate   = -1;
    2094              : 
    2095            4 :     if( m_xrif->raw_size != last_rawSize || m_xrif->compressed_size != last_compressedSize ||
    2096            2 :         m_xrif->encode_rate != last_encodeRate || m_xrif->difference_rate != last_differenceRate ||
    2097            0 :         m_xrif->reorder_rate != last_reorderRate || m_xrif->compress_rate != last_compressRate || force )
    2098              :     {
    2099            4 :         telem<telem_saving>( { (uint32_t)m_xrif->raw_size,
    2100            0 :                                (uint32_t)m_xrif->compressed_size,
    2101            0 :                                (float)m_xrif->encode_rate,
    2102            0 :                                (float)m_xrif->difference_rate,
    2103            0 :                                (float)m_xrif->reorder_rate,
    2104            4 :                                (float)m_xrif->compress_rate } );
    2105              : 
    2106            4 :         last_rawSize        = m_xrif->raw_size;
    2107            4 :         last_compressedSize = m_xrif->compressed_size;
    2108            4 :         last_encodeRate     = m_xrif->encode_rate;
    2109            4 :         last_differenceRate = m_xrif->difference_rate;
    2110            4 :         last_reorderRate    = m_xrif->reorder_rate;
    2111            4 :         last_compressRate   = m_xrif->compress_rate;
    2112              :     }
    2113              : 
    2114            4 :     return 0;
    2115              : }
    2116              : 
    2117              : } // namespace app
    2118              : } // namespace MagAOX
    2119              : 
    2120              : #endif
        

Generated by: LCOV version 2.0-1