Line data Source code
1 : /** \file shmimLat.hpp
2 : * \brief The shmimLat class declaration and definition.
3 : *
4 : * \ingroup xrif2hmim_files
5 : */
6 :
7 : #ifndef shmimLat_hpp
8 : #define shmimLat_hpp
9 :
10 : #include <ImageStreamIO/ImageStruct.h>
11 : #include <ImageStreamIO/ImageStreamIO.h>
12 :
13 : #include <xrif/xrif.h>
14 :
15 : #include <mx/ioutils/fileUtils.hpp>
16 : #include <mx/improc/eigenCube.hpp>
17 :
18 : #include <mx/sys/timeUtils.hpp>
19 : #include <mx/math/plot/gnuPlot.hpp>
20 :
21 : #include "../../libMagAOX/libMagAOX.hpp"
22 :
23 : /** \defgroup shmimLat shmimLat: plot data from a shmim in gnuplot
24 : * \brief Monitor a shmim and update a gnuplot plot
25 : *
26 : * <a href="../handbook/utils/shmimLat.html">Utility Documentation</a>
27 : *
28 : * \ingroup utils
29 : *
30 : */
31 :
32 : /** \defgroup shmimLat_files shmimLat Files
33 : * \ingroup shmimLat
34 : */
35 :
36 : bool g_timeToDie = false;
37 :
38 0 : void sigTermHandler( int signum, siginfo_t *siginf, void *ucont )
39 : {
40 : // Suppress those warnings . . .
41 : static_cast<void>( signum );
42 : static_cast<void>( siginf );
43 : static_cast<void>( ucont );
44 :
45 0 : std::cerr << "\n"; // clear out the ^C char
46 :
47 0 : g_timeToDie = true;
48 0 : }
49 :
50 : /// A utility to stream MagaO-X images from xrif compressed archives to an ImageStreamIO stream.
51 : /**
52 : * \todo finish md doc for shmimLat
53 : *
54 : * \ingroup shmimLat
55 : */
56 : class shmimLat : public mx::app::application
57 : {
58 : protected:
59 : /** \name Configurable Parameters
60 : * @{
61 : */
62 :
63 :
64 : ///@}
65 :
66 : /// Structure to manage the image threads, including startup.
67 : struct s_imageThread
68 : {
69 : shmimLat *m_sp; ///< a pointer to a shmimLat instance (normally this)
70 :
71 : std::thread *m_thread{ nullptr }; ///< Thread for receiving image slice updates. A pointer to allow copying,
72 : ///< but must be deleted in d'tor of parent.
73 :
74 : std::string m_shmimName; ///< the name of the image to subscribe from this thread
75 :
76 : std::vector<std::vector<float>> m_y;
77 :
78 : /// C'tor to create the thread object
79 0 : s_imageThread()
80 0 : {
81 0 : m_thread = new std::thread;
82 0 : }
83 : };
84 :
85 : std::vector<s_imageThread> m_imageThreads; ///< The image threads, one per shared memory stream.
86 :
87 : std::vector<timespec> m_atimes;
88 : std::vector<timespec> m_arrtimes;
89 : ///@}
90 :
91 : public:
92 : ~shmimLat();
93 :
94 : virtual void setupConfig();
95 :
96 : virtual void loadConfig();
97 :
98 : virtual int execute();
99 :
100 :
101 : private:
102 : /// Thread starter, called by imageThreadStart on thread construction. Calls imageThreadExec.
103 : static void internal_imageThreadStart( s_imageThread *mit /**< [in] a pointer to an s_imageThread structure */ );
104 :
105 : public:
106 : /// Start the image thread.
107 : int imageThreadStart( size_t thno /**< [in] the thread to start */ );
108 :
109 : /// Execute the image thread.
110 : void imageThreadExec( s_imageThread *mit /**< [in] a pointer to an s_imageThread structure */ );
111 : };
112 :
113 0 : inline shmimLat::~shmimLat()
114 : {
115 0 : }
116 :
117 0 : inline void shmimLat::setupConfig()
118 : {
119 0 : config.add( "shmimName",
120 : "n",
121 : "shmimName",
122 : argType::Required,
123 : "",
124 : "shmimName",
125 : false,
126 : "vector<string>",
127 : "The names of the shared memory buffer to stream to. Default is \"shmimLat\"" );
128 :
129 0 : config.add( "circBuffLength",
130 : "L",
131 : "circBuffLength",
132 : argType::Required,
133 : "",
134 : "circBuffLength",
135 : false,
136 : "int",
137 : "The length of the shared memory circular buffer. Default is 1." );
138 :
139 :
140 0 : }
141 :
142 0 : inline void shmimLat::loadConfig()
143 : {
144 0 : std::vector<std::string> shmimNames;
145 0 : config( shmimNames, "shmimName" );
146 :
147 0 : if( shmimNames.size() == 0 )
148 : {
149 0 : std::cerr << "shmim names not specified with -n\n";
150 0 : doHelp = true;
151 0 : return;
152 : }
153 :
154 0 : for( auto &s : shmimNames )
155 : {
156 0 : s_imageThread nt;
157 0 : nt.m_sp = this;
158 0 : nt.m_shmimName = s;
159 0 : m_imageThreads.push_back( nt );
160 0 : }
161 :
162 :
163 0 : }
164 :
165 0 : inline int shmimLat::execute()
166 : {
167 :
168 : // Install signal handling
169 : struct sigaction act;
170 : sigset_t set;
171 :
172 0 : act.sa_sigaction = sigTermHandler;
173 0 : act.sa_flags = SA_SIGINFO;
174 0 : sigemptyset( &set );
175 0 : act.sa_mask = set;
176 :
177 0 : errno = 0;
178 0 : if( sigaction( SIGTERM, &act, 0 ) < 0 )
179 : {
180 0 : std::cerr << " (" << invokedName << "): error setting SIGTERM handler: " << strerror( errno ) << "\n";
181 0 : return -1;
182 : }
183 :
184 0 : errno = 0;
185 0 : if( sigaction( SIGQUIT, &act, 0 ) < 0 )
186 : {
187 0 : std::cerr << " (" << invokedName << "): error setting SIGQUIT handler: " << strerror( errno ) << "\n";
188 0 : return -1;
189 : }
190 :
191 0 : errno = 0;
192 0 : if( sigaction( SIGINT, &act, 0 ) < 0 )
193 : {
194 0 : std::cerr << " (" << invokedName << "): error setting SIGINT handler: " << strerror( errno ) << "\n";
195 0 : return -1;
196 : }
197 :
198 :
199 0 : m_atimes.resize(120000);
200 0 : m_arrtimes.resize(m_atimes.size());
201 :
202 0 : imageThreadExec(&m_imageThreads[0]);
203 :
204 0 : std::ofstream fout("times");
205 :
206 0 : fout.precision(15);
207 :
208 0 : long atime0 = m_atimes[0].tv_sec;
209 :
210 0 : for(size_t n = 0; n < m_atimes.size(); ++n)
211 : {
212 0 : double at = m_atimes[n].tv_sec - atime0 + m_atimes[n].tv_nsec/1e9;
213 0 : double arrt = m_arrtimes[n].tv_sec - atime0 + m_arrtimes[n].tv_nsec/1e9;
214 :
215 0 : fout << at << ' ' << arrt << ' ' << arrt-at << '\n';
216 : }
217 0 : fout.close();
218 0 : return 0;
219 0 : }
220 :
221 : inline void shmimLat::internal_imageThreadStart( s_imageThread *mit )
222 : {
223 : mit->m_sp->imageThreadExec( mit );
224 : }
225 :
226 : inline int shmimLat::imageThreadStart( size_t thno )
227 : {
228 : try
229 : {
230 : *m_imageThreads[thno].m_thread = std::thread( internal_imageThreadStart, &m_imageThreads[thno] );
231 : }
232 : catch( const std::exception &e )
233 : {
234 : std::cerr << std::string( "exception in image thread startup: " ) + e.what() << ' ' << __FILE__ << ' '
235 : << __LINE__ << '\n';
236 : return -1;
237 : }
238 : catch( ... )
239 : {
240 : std::cerr << "unknown exception in image thread startup: " << __FILE__ << ' ' << __LINE__ << '\n';
241 : return -1;
242 : }
243 :
244 : if( !m_imageThreads[thno].m_thread->joinable() )
245 : {
246 : std::cerr << "image thread did not start: " << __FILE__ << ' ' << __LINE__ << '\n';
247 : return -1;
248 : }
249 :
250 : return 0;
251 : }
252 :
253 0 : inline void shmimLat::imageThreadExec( s_imageThread *mit )
254 : {
255 0 : bool opened = false;
256 : bool restart;
257 0 : ino_t inode = 0; ///< The inode of the image stream file
258 0 : int semaphoreNumber = 9; ///< The image structure semaphore index.
259 :
260 : IMAGE imageStream;
261 :
262 0 : mx::improc::eigenImage<float> im;
263 :
264 0 : size_t n = 0;
265 :
266 0 : while( !g_timeToDie )
267 : {
268 : /* Initialize ImageStreamIO
269 : */
270 0 : opened = false;
271 0 : restart = false; // Set this up front, since we're about to restart.
272 :
273 0 : int logged = 0;
274 0 : while( !opened && !g_timeToDie && !restart )
275 : {
276 : // b/c ImageStreamIO prints every single time, and latest version don't support stopping it yet, and that
277 : // isn't thread-safe-able anyway we do our own checks. This is the same code in ImageStreamIO_openIm...
278 : int SM_fd;
279 : char SM_fname[200];
280 0 : ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), mit->m_shmimName.c_str() );
281 0 : SM_fd = open( SM_fname, O_RDWR );
282 0 : if( SM_fd == -1 )
283 : {
284 0 : if( !logged )
285 : {
286 0 : std::cerr << "ImageStream " + mit->m_shmimName + " not found (yet). Retrying . . .\n";
287 : }
288 0 : logged = 1;
289 0 : sleep( 1 ); // be patient
290 0 : continue;
291 : }
292 :
293 : // Found and opened, close it and then use ImageStreamIO
294 0 : logged = 0;
295 0 : close( SM_fd );
296 :
297 0 : if( ImageStreamIO_openIm( &imageStream, mit->m_shmimName.c_str() ) == 0 )
298 : {
299 : /// \todo this isn't right--> isn't there a define in cacao to use?
300 0 : if( imageStream.md[0].sem <= semaphoreNumber )
301 :
302 : {
303 0 : ImageStreamIO_closeIm( &imageStream );
304 0 : mx::sys::sleep( 1 ); // We just need to wait for the server process to finish startup.
305 : }
306 : else
307 : {
308 0 : opened = true;
309 : char SM_fname[200];
310 0 : ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), mit->m_shmimName.c_str() );
311 :
312 : struct stat buffer;
313 0 : int rv = stat( SM_fname, &buffer );
314 :
315 0 : if( rv != 0 )
316 : {
317 0 : std::cerr << "Could not get inode for " + mit->m_shmimName +
318 0 : ". Source process will need to be restarted.\n";
319 0 : ImageStreamIO_closeIm( &imageStream );
320 0 : return;
321 : }
322 0 : inode = buffer.st_ino;
323 : }
324 : }
325 : else
326 : {
327 0 : mx::sys::sleep( 1 ); // be patient
328 : }
329 : }
330 :
331 0 : if( restart )
332 : {
333 0 : continue; // this is kinda dumb. we just go around on restart, so why test in the while loop at all?
334 : }
335 :
336 0 : if( g_timeToDie )
337 : {
338 0 : if( !opened )
339 : {
340 0 : return;
341 : }
342 :
343 0 : ImageStreamIO_closeIm( &imageStream );
344 0 : return;
345 : }
346 :
347 : semaphoreNumber =
348 0 : ImageStreamIO_getsemwaitindex( &imageStream, semaphoreNumber ); // ask for semaphore we had before
349 :
350 0 : if( semaphoreNumber < 0 )
351 : {
352 0 : std::cerr << "No valid semaphore found for " + mit->m_shmimName +
353 0 : ". Source process will need to be restarted.\n";
354 0 : return;
355 : }
356 :
357 0 : ImageStreamIO_semflush( &imageStream, semaphoreNumber );
358 :
359 0 : sem_t *sem = imageStream.semptr[semaphoreNumber]; ///< The semaphore to monitor for new image data
360 :
361 :
362 :
363 :
364 : // This is the main image grabbing loop.
365 0 : while( !g_timeToDie && !restart && n < m_atimes.size())
366 : {
367 : timespec ts;
368 :
369 0 : if( clock_gettime( CLOCK_REALTIME, &ts ) < 0 )
370 : {
371 0 : std::cerr << "error from clock_gettime\n";
372 0 : return;
373 : }
374 :
375 0 : ts.tv_sec += 1;
376 :
377 0 : if( sem_timedwait( sem, &ts ) == 0 )
378 : {
379 :
380 0 : if( g_timeToDie || restart )
381 : {
382 : break; // Check for exit signals
383 : }
384 :
385 0 : clock_gettime(CLOCK_ISIO, &m_arrtimes[n]);
386 0 : m_atimes[n] = imageStream.md[0].writetime;
387 :
388 0 : ++n;
389 : }
390 : else
391 : {
392 0 : if( imageStream.md[0].sem <= 0 )
393 0 : break; // Indicates that the server has cleaned up.
394 :
395 : // Check for why we timed out
396 0 : if( errno == EINTR )
397 0 : break; // This indicates signal interrupted us, time to restart or shutdown, loop will exit normally
398 : // if flags set.
399 :
400 : // ETIMEDOUT means we should check for deletion, and then wait more.
401 : // Otherwise, report an error.
402 0 : if( errno != ETIMEDOUT )
403 : {
404 0 : std::cerr << "error from sem_timedwait\n";
405 0 : break;
406 : }
407 :
408 : // Check if the file has disappeared.
409 : int SM_fd;
410 : char SM_fname[200];
411 0 : ImageStreamIO_filename( SM_fname, sizeof( SM_fname ), mit->m_shmimName.c_str() );
412 0 : SM_fd = open( SM_fname, O_RDWR );
413 0 : if( SM_fd == -1 )
414 : {
415 0 : restart = true;
416 : }
417 0 : close( SM_fd );
418 :
419 : // Check if the inode changed
420 : struct stat buffer;
421 0 : int rv = stat( SM_fname, &buffer );
422 0 : if( rv != 0 )
423 : {
424 0 : restart = true;
425 : }
426 :
427 0 : if( buffer.st_ino != inode )
428 : {
429 0 : restart = true;
430 : }
431 : }
432 : }
433 :
434 0 : if(n >= m_atimes.size())
435 : {
436 0 : break;
437 : }
438 : // opened == true if we can get to this
439 0 : if( semaphoreNumber >= 0 )
440 0 : imageStream.semReadPID[semaphoreNumber] = 0; // release semaphore
441 0 : ImageStreamIO_closeIm( &imageStream );
442 0 : opened = false;
443 :
444 : } // outer loop, will exit if m_shutdown==true
445 :
446 0 : if( opened )
447 : {
448 0 : ImageStreamIO_closeIm( &imageStream );
449 : }
450 0 : }
451 :
452 : #endif // shmimLat_hpp
|