LCOV - code coverage report
Current view: top level - utils/shmimLat - shmimLat.hpp (source / functions) Coverage Total Hit
Test: MagAOX Lines: 0.0 % 143 0
Test Date: 2026-01-03 21:03:39 Functions: 0.0 % 7 0

            Line data    Source code
       1              : /** \file shmimLat.hpp
       2              :  * \brief The shmimLat class declaration and definition.
       3              :  *
       4              :  * \ingroup xrif2hmim_files
       5              :  */
       6              : 
       7              : #ifndef shmimLat_hpp
       8              : #define shmimLat_hpp
       9              : 
      10              : #include <ImageStreamIO/ImageStruct.h>
      11              : #include <ImageStreamIO/ImageStreamIO.h>
      12              : 
      13              : #include <xrif/xrif.h>
      14              : 
      15              : #include <mx/ioutils/fileUtils.hpp>
      16              : #include <mx/improc/eigenCube.hpp>
      17              : 
      18              : #include <mx/sys/timeUtils.hpp>
      19              : #include <mx/math/plot/gnuPlot.hpp>
      20              : 
      21              : #include "../../libMagAOX/libMagAOX.hpp"
      22              : 
      23              : /** \defgroup shmimLat shmimLat: plot data from a shmim in gnuplot
      24              :  * \brief Monitor a shmim and update a gnuplot plot
      25              :  *
      26              :  * <a href="../handbook/utils/shmimLat.html">Utility Documentation</a>
      27              :  *
      28              :  * \ingroup utils
      29              :  *
      30              :  */
      31              : 
      32              : /** \defgroup shmimLat_files shmimLat Files
      33              :  * \ingroup shmimLat
      34              :  */
      35              : 
      36              : bool g_timeToDie = false;
      37              : 
      38            0 : void sigTermHandler( int signum, siginfo_t *siginf, void *ucont )
      39              : {
      40              :     // Suppress those warnings . . .
      41              :     static_cast<void>( signum );
      42              :     static_cast<void>( siginf );
      43              :     static_cast<void>( ucont );
      44              : 
      45            0 :     std::cerr << "\n"; // clear out the ^C char
      46              : 
      47            0 :     g_timeToDie = true;
      48            0 : }
      49              : 
      50              : /// A utility to stream MagaO-X images from xrif compressed archives to an ImageStreamIO stream.
      51              : /**
      52              :  * \todo finish md doc for shmimLat
      53              :  *
      54              :  * \ingroup shmimLat
      55              :  */
      56              : class shmimLat : public mx::app::application
      57              : {
      58              :   protected:
      59              :     /** \name Configurable Parameters
      60              :      * @{
      61              :      */
      62              : 
      63              : 
      64              :     ///@}
      65              : 
      66              :     /// Structure to manage the image threads, including startup.
      67              :     struct s_imageThread
      68              :     {
      69              :         shmimLat *m_sp; ///< a pointer to a shmimLat instance (normally this)
      70              : 
      71              :         std::thread *m_thread{ nullptr }; ///< Thread for receiving image slice updates.  A pointer to allow copying,
      72              :                                           ///< but must be deleted in d'tor of parent.
      73              : 
      74              :         std::string m_shmimName; ///< the name of the image to subscribe from this thread
      75              : 
      76              :         std::vector<std::vector<float>> m_y;
      77              : 
      78              :         /// C'tor to create the thread object
      79            0 :         s_imageThread()
      80            0 :         {
      81            0 :             m_thread = new std::thread;
      82            0 :         }
      83              :     };
      84              : 
      85              :     std::vector<s_imageThread> m_imageThreads; ///< The image threads, one per shared memory stream.
      86              : 
      87              :     std::vector<timespec> m_atimes;
      88              :     std::vector<timespec> m_arrtimes;
      89              :     ///@}
      90              : 
      91              :   public:
      92              :     ~shmimLat();
      93              : 
      94              :     virtual void setupConfig();
      95              : 
      96              :     virtual void loadConfig();
      97              : 
      98              :     virtual int execute();
      99              : 
     100              : 
     101              :   private:
     102              :     /// Thread starter, called by imageThreadStart on thread construction.  Calls imageThreadExec.
     103              :     static void internal_imageThreadStart( s_imageThread *mit /**< [in] a pointer to an s_imageThread structure */ );
     104              : 
     105              :   public:
     106              :     /// Start the image thread.
     107              :     int imageThreadStart( size_t thno /**< [in] the thread to start */ );
     108              : 
     109              :     /// Execute the image thread.
     110              :     void imageThreadExec( s_imageThread *mit /**< [in] a pointer to an s_imageThread structure */ );
     111              : };
     112              : 
     113            0 : inline shmimLat::~shmimLat()
     114              : {
     115            0 : }
     116              : 
     117            0 : inline void shmimLat::setupConfig()
     118              : {
     119            0 :     config.add( "shmimName",
     120              :                 "n",
     121              :                 "shmimName",
     122              :                 argType::Required,
     123              :                 "",
     124              :                 "shmimName",
     125              :                 false,
     126              :                 "vector<string>",
     127              :                 "The names of the shared memory buffer to stream to.  Default is \"shmimLat\"" );
     128              : 
     129            0 :     config.add( "circBuffLength",
     130              :                 "L",
     131              :                 "circBuffLength",
     132              :                 argType::Required,
     133              :                 "",
     134              :                 "circBuffLength",
     135              :                 false,
     136              :                 "int",
     137              :                 "The length of the shared memory circular buffer. Default is 1." );
     138              : 
     139              : 
     140            0 : }
     141              : 
     142            0 : inline void shmimLat::loadConfig()
     143              : {
     144            0 :     std::vector<std::string> shmimNames;
     145            0 :     config( shmimNames, "shmimName" );
     146              : 
     147            0 :     if( shmimNames.size() == 0 )
     148              :     {
     149            0 :         std::cerr << "shmim names not specified with -n\n";
     150            0 :         doHelp = true;
     151            0 :         return;
     152              :     }
     153              : 
     154            0 :     for( auto &s : shmimNames )
     155              :     {
     156            0 :         s_imageThread nt;
     157            0 :         nt.m_sp        = this;
     158            0 :         nt.m_shmimName = s;
     159            0 :         m_imageThreads.push_back( nt );
     160            0 :     }
     161              : 
     162              : 
     163            0 : }
     164              : 
     165            0 : inline int shmimLat::execute()
     166              : {
     167              : 
     168              :     // Install signal handling
     169              :     struct sigaction act;
     170              :     sigset_t         set;
     171              : 
     172            0 :     act.sa_sigaction = sigTermHandler;
     173            0 :     act.sa_flags     = SA_SIGINFO;
     174            0 :     sigemptyset( &set );
     175            0 :     act.sa_mask = set;
     176              : 
     177            0 :     errno = 0;
     178            0 :     if( sigaction( SIGTERM, &act, 0 ) < 0 )
     179              :     {
     180            0 :         std::cerr << " (" << invokedName << "): error setting SIGTERM handler: " << strerror( errno ) << "\n";
     181            0 :         return -1;
     182              :     }
     183              : 
     184            0 :     errno = 0;
     185            0 :     if( sigaction( SIGQUIT, &act, 0 ) < 0 )
     186              :     {
     187            0 :         std::cerr << " (" << invokedName << "): error setting SIGQUIT handler: " << strerror( errno ) << "\n";
     188            0 :         return -1;
     189              :     }
     190              : 
     191            0 :     errno = 0;
     192            0 :     if( sigaction( SIGINT, &act, 0 ) < 0 )
     193              :     {
     194            0 :         std::cerr << " (" << invokedName << "): error setting SIGINT handler: " << strerror( errno ) << "\n";
     195            0 :         return -1;
     196              :     }
     197              : 
     198              : 
     199            0 :     m_atimes.resize(120000);
     200            0 :     m_arrtimes.resize(m_atimes.size());
     201              : 
     202            0 :     imageThreadExec(&m_imageThreads[0]);
     203              : 
     204            0 :     std::ofstream fout("times");
     205              : 
     206            0 :     fout.precision(15);
     207              : 
     208            0 :     long atime0 = m_atimes[0].tv_sec;
     209              : 
     210            0 :     for(size_t n = 0; n < m_atimes.size(); ++n)
     211              :     {
     212            0 :         double at = m_atimes[n].tv_sec - atime0 + m_atimes[n].tv_nsec/1e9;
     213            0 :         double arrt = m_arrtimes[n].tv_sec - atime0 + m_arrtimes[n].tv_nsec/1e9;
     214              : 
     215            0 :         fout << at << ' ' << arrt << ' ' << arrt-at << '\n';
     216              :     }
     217            0 :     fout.close();
     218            0 :     return 0;
     219            0 : }
     220              : 
     221              : inline void shmimLat::internal_imageThreadStart( s_imageThread *mit )
     222              : {
     223              :     mit->m_sp->imageThreadExec( mit );
     224              : }
     225              : 
     226              : inline int shmimLat::imageThreadStart( size_t thno )
     227              : {
     228              :     try
     229              :     {
     230              :         *m_imageThreads[thno].m_thread = std::thread( internal_imageThreadStart, &m_imageThreads[thno] );
     231              :     }
     232              :     catch( const std::exception &e )
     233              :     {
     234              :         std::cerr << std::string( "exception in image thread startup: " ) + e.what() << ' ' << __FILE__ << ' '
     235              :                   << __LINE__ << '\n';
     236              :         return -1;
     237              :     }
     238              :     catch( ... )
     239              :     {
     240              :         std::cerr << "unknown exception in image thread startup: " << __FILE__ << ' ' << __LINE__ << '\n';
     241              :         return -1;
     242              :     }
     243              : 
     244              :     if( !m_imageThreads[thno].m_thread->joinable() )
     245              :     {
     246              :         std::cerr << "image thread did not start: " << __FILE__ << ' ' << __LINE__ << '\n';
     247              :         return -1;
     248              :     }
     249              : 
     250              :     return 0;
     251              : }
     252              : 
     253            0 : inline void shmimLat::imageThreadExec( s_imageThread *mit )
     254              : {
     255            0 :     bool  opened = false;
     256              :     bool  restart;
     257            0 :     ino_t inode           = 0; ///< The inode of the image stream file
     258            0 :     int   semaphoreNumber = 9; ///< The image structure semaphore index.
     259              : 
     260              :     IMAGE    imageStream;
     261              : 
     262            0 :     mx::improc::eigenImage<float> im;
     263              : 
     264            0 :     size_t n = 0;
     265              : 
     266            0 :     while( !g_timeToDie )
     267              :     {
     268              :         /* Initialize ImageStreamIO
     269              :          */
     270            0 :         opened  = false;
     271            0 :         restart = false; // Set this up front, since we're about to restart.
     272              : 
     273            0 :         int logged = 0;
     274            0 :         while( !opened && !g_timeToDie && !restart )
     275              :         {
     276              :             // b/c ImageStreamIO prints every single time, and latest version don't support stopping it yet, and that
     277              :             // isn't thread-safe-able anyway we do our own checks.  This is the same code in ImageStreamIO_openIm...
     278              :             int  SM_fd;
     279              :             char SM_fname[200];
     280            0 :             ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), mit->m_shmimName.c_str() );
     281            0 :             SM_fd = open( SM_fname, O_RDWR );
     282            0 :             if( SM_fd == -1 )
     283              :             {
     284            0 :                 if( !logged )
     285              :                 {
     286            0 :                     std::cerr << "ImageStream " + mit->m_shmimName + " not found (yet).  Retrying . . .\n";
     287              :                 }
     288            0 :                 logged = 1;
     289            0 :                 sleep( 1 ); // be patient
     290            0 :                 continue;
     291              :             }
     292              : 
     293              :             // Found and opened,  close it and then use ImageStreamIO
     294            0 :             logged = 0;
     295            0 :             close( SM_fd );
     296              : 
     297            0 :             if( ImageStreamIO_openIm( &imageStream, mit->m_shmimName.c_str() ) == 0 )
     298              :             {
     299              :                 /// \todo this isn't right--> isn't there a define in cacao to use?
     300            0 :                 if( imageStream.md[0].sem <= semaphoreNumber )
     301              : 
     302              :                 {
     303            0 :                     ImageStreamIO_closeIm( &imageStream );
     304            0 :                     mx::sys::sleep( 1 ); // We just need to wait for the server process to finish startup.
     305              :                 }
     306              :                 else
     307              :                 {
     308            0 :                     opened = true;
     309              :                     char SM_fname[200];
     310            0 :                     ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), mit->m_shmimName.c_str() );
     311              : 
     312              :                     struct stat buffer;
     313            0 :                     int         rv = stat( SM_fname, &buffer );
     314              : 
     315            0 :                     if( rv != 0 )
     316              :                     {
     317            0 :                         std::cerr << "Could not get inode for " + mit->m_shmimName +
     318            0 :                                          ". Source process will need to be restarted.\n";
     319            0 :                         ImageStreamIO_closeIm( &imageStream );
     320            0 :                         return;
     321              :                     }
     322            0 :                     inode = buffer.st_ino;
     323              :                 }
     324              :             }
     325              :             else
     326              :             {
     327            0 :                 mx::sys::sleep( 1 ); // be patient
     328              :             }
     329              :         }
     330              : 
     331            0 :         if( restart )
     332              :         {
     333            0 :             continue; // this is kinda dumb.  we just go around on restart, so why test in the while loop at all?
     334              :         }
     335              : 
     336            0 :         if( g_timeToDie )
     337              :         {
     338            0 :             if( !opened )
     339              :             {
     340            0 :                 return;
     341              :             }
     342              : 
     343            0 :             ImageStreamIO_closeIm( &imageStream );
     344            0 :             return;
     345              :         }
     346              : 
     347              :         semaphoreNumber =
     348            0 :             ImageStreamIO_getsemwaitindex( &imageStream, semaphoreNumber ); // ask for semaphore we had before
     349              : 
     350            0 :         if( semaphoreNumber < 0 )
     351              :         {
     352            0 :             std::cerr << "No valid semaphore found for " + mit->m_shmimName +
     353            0 :                              ". Source process will need to be restarted.\n";
     354            0 :             return;
     355              :         }
     356              : 
     357            0 :         ImageStreamIO_semflush( &imageStream, semaphoreNumber );
     358              : 
     359            0 :         sem_t *sem = imageStream.semptr[semaphoreNumber]; ///< The semaphore to monitor for new image data
     360              : 
     361              : 
     362              : 
     363              : 
     364              :         // This is the main image grabbing loop.
     365            0 :         while( !g_timeToDie && !restart && n < m_atimes.size())
     366              :         {
     367              :             timespec ts;
     368              : 
     369            0 :             if( clock_gettime( CLOCK_REALTIME, &ts ) < 0 )
     370              :             {
     371            0 :                 std::cerr << "error from clock_gettime\n";
     372            0 :                 return;
     373              :             }
     374              : 
     375            0 :             ts.tv_sec += 1;
     376              : 
     377            0 :             if( sem_timedwait( sem, &ts ) == 0 )
     378              :             {
     379              : 
     380            0 :                 if( g_timeToDie || restart )
     381              :                 {
     382              :                     break; // Check for exit signals
     383              :                 }
     384              : 
     385            0 :                 clock_gettime(CLOCK_ISIO, &m_arrtimes[n]);
     386            0 :                 m_atimes[n] = imageStream.md[0].writetime;
     387              : 
     388            0 :                 ++n;
     389              :             }
     390              :             else
     391              :             {
     392            0 :                 if( imageStream.md[0].sem <= 0 )
     393            0 :                     break; // Indicates that the server has cleaned up.
     394              : 
     395              :                 // Check for why we timed out
     396            0 :                 if( errno == EINTR )
     397            0 :                     break; // This indicates signal interrupted us, time to restart or shutdown, loop will exit normally
     398              :                            // if flags set.
     399              : 
     400              :                 // ETIMEDOUT means we should check for deletion, and then wait more.
     401              :                 // Otherwise, report an error.
     402            0 :                 if( errno != ETIMEDOUT )
     403              :                 {
     404            0 :                     std::cerr << "error from sem_timedwait\n";
     405            0 :                     break;
     406              :                 }
     407              : 
     408              :                 // Check if the file has disappeared.
     409              :                 int  SM_fd;
     410              :                 char SM_fname[200];
     411            0 :                 ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), mit->m_shmimName.c_str() );
     412            0 :                 SM_fd = open( SM_fname, O_RDWR );
     413            0 :                 if( SM_fd == -1 )
     414              :                 {
     415            0 :                     restart = true;
     416              :                 }
     417            0 :                 close( SM_fd );
     418              : 
     419              :                 // Check if the inode changed
     420              :                 struct stat buffer;
     421            0 :                 int         rv = stat( SM_fname, &buffer );
     422            0 :                 if( rv != 0 )
     423              :                 {
     424            0 :                     restart = true;
     425              :                 }
     426              : 
     427            0 :                 if( buffer.st_ino != inode )
     428              :                 {
     429            0 :                     restart = true;
     430              :                 }
     431              :             }
     432              :         }
     433              : 
     434            0 :         if(n >= m_atimes.size())
     435              :         {
     436            0 :             break;
     437              :         }
     438              :         // opened == true if we can get to this
     439            0 :         if( semaphoreNumber >= 0 )
     440            0 :             imageStream.semReadPID[semaphoreNumber] = 0; // release semaphore
     441            0 :         ImageStreamIO_closeIm( &imageStream );
     442            0 :         opened = false;
     443              : 
     444              :     } // outer loop, will exit if m_shutdown==true
     445              : 
     446            0 :     if( opened )
     447              :     {
     448            0 :         ImageStreamIO_closeIm( &imageStream );
     449              :     }
     450            0 : }
     451              : 
     452              : #endif // shmimLat_hpp
        

Generated by: LCOV version 2.0-1