API
 
Loading...
Searching...
No Matches
streamWriter.hpp
Go to the documentation of this file.
1/** \file streamWriter.hpp
2 * \brief The MagAO-X Image Stream Writer
3 *
4 * \author Jared R. Males (jaredmales@gmail.com)
5 *
6 * \ingroup streamWriter_files
7 */
8
9#ifndef streamWriter_hpp
10#define streamWriter_hpp
11
12#include <ImageStreamIO/ImageStruct.h>
13#include <ImageStreamIO/ImageStreamIO.h>
14
15#include <xrif/xrif.h>
16
17#include <mx/sys/timeUtils.hpp>
18
19#include "../../libMagAOX/libMagAOX.hpp" //Note this is included on command line to trigger pch
20#include "../../magaox_git_version.h"
21
22#define NOT_WRITING (0)
23#define START_WRITING (1)
24#define WRITING (2)
25#define STOP_WRITING (3)
26
27namespace MagAOX
28{
29namespace app
30{
31
32/** \defgroup streamWriter ImageStreamIO Stream Writing
33 * \brief Writes the contents of an ImageStreamIO image stream to disk.
34 *
35 * <a href="../handbook/operating/software/apps/streamWriter.html">Application Documentation</a>
36 *
37 * \ingroup apps
38 *
39 */
40
41/** \defgroup streamWriter_files ImageStreamIO Stream Writing
42 * \ingroup streamWriter
43 */
44
45/** MagAO-X application to control writing ImageStreamIO streams to disk.
46 *
47 * \ingroup streamWriter
48 *
49 */
50class streamWriter : public MagAOXApp<>, public dev::telemeter<streamWriter>
51{
53
54 friend class dev::telemeter<streamWriter>;
55
56 // Give the test harness access.
57 friend class streamWriter_test;
58
59protected:
60 /** \name configurable parameters
61 *@{
62 */
63
64 std::string m_rawimageDir; ///< The path where files will be saved.
65
66 size_t m_circBuffLength{1024}; ///< The length of the circular buffer, in frames
67
68 size_t m_writeChunkLength{512}; ///< The number of frames to write at a time
69
70 double m_maxChunkTime{10}; ///< The maximum time before writing regardless of number of frames.
71
72 std::string m_shmimName; ///< The name of the shared memory buffer.
73
74 std::string m_outName; ///< The name to use for outputting files, Default is m_shmimName.
75
76 int m_semaphoreNumber{7}; ///< The image structure semaphore index.
77
78 unsigned m_semWaitSec{0}; ///< The time in whole sec to wait on the semaphore, to which m_semWaitNSec is added. Default is 0 nsec.
79
80 unsigned m_semWaitNSec{500000000}; ///< The time in nsec to wait on the semaphore, added to m_semWaitSec. Max is 999999999. Default is 5e8 nsec.
81
82 int m_lz4accel{1};
83
84 bool m_compress{true};
85
86 ///@}
87
88 size_t m_width{0}; ///< The width of the image
89 size_t m_height{0}; ///< The height of the image
90 uint8_t m_dataType{0}; ///< The ImageStreamIO type code.
91 int m_typeSize{0}; ///< The pixel byte depth
92
93 char *m_rawImageCircBuff{nullptr};
95
96 size_t m_currImage{0};
97
98 double m_currImageTime{0}; ///< The write-time of the current image
99
100 double m_currChunkStartTime{0}; ///< The write-time of the first image in the chunk
101
102 // Writer book-keeping:
103 int m_writing{NOT_WRITING}; ///< Controls whether or not images are being written, and sequences start and stop of writing.
104
105 uint64_t m_currChunkStart{0}; ///< The circular buffer starting position of the current to-be-written chunk.
106 uint64_t m_nextChunkStart{0}; ///< The circular buffer starting position of the next to-be-written chunk.
107
108 uint64_t m_currSaveStart{0}; ///< The circular buffer position at which to start saving.
109 uint64_t m_currSaveStop{0}; ///< The circular buffer position at which to stop saving.
110
111 uint64_t m_currSaveStopFrameNo{0}; ///< The frame number of the image at which saving stopped (for logging)
112
113 /// The xrif compression handle for image data
114 xrif_t m_xrif{nullptr};
115
116 /// Storage for the xrif image data file header
117 char *m_xrif_header{nullptr};
118
119 /// The xrif compression handle for image data
121
122 /// Storage for the xrif image data file header
123 char *m_xrif_timing_header{nullptr};
124
125public:
126 /// Default c'tor
127 streamWriter();
128
129 /// Destructor
131
132 /// Setup the configuration system (called by MagAOXApp::setup())
133 virtual void setupConfig();
134
135 /// load the configuration system results (called by MagAOXApp::setup())
136 virtual void loadConfig();
137
138 /// Startup functions
139 /** Sets up the INDI vars.
140 *
141 */
142 virtual int appStartup();
143
144 /// Implementation of the FSM for the Siglent SDG
145 virtual int appLogic();
146
147 /// Do any needed shutdown tasks. Currently nothing in this app.
148 virtual int appShutdown();
149
151 /** \name SIGSEGV & SIGBUS signal handling
152 * These signals occur as a result of a ImageStreamIO source server resetting (e.g. changing frame sizes).
153 * When they occur a restart of the framegrabber and framewriter thread main loops is triggered.
154 *
155 * @{
156 */
157 bool m_restart{false};
158
159 static streamWriter *m_selfWriter; ///< Static pointer to this (set in constructor). Used for getting out of the static SIGSEGV handler.
160
161 /// Initialize the xrif system.
162 /** Allocates the handles and headers pointers.
163 *
164 * \returns 0 on success.
165 * \returns -1 on error.
166 */
167 int initialize_xrif();
168
169 /// Sets the handler for SIGSEGV and SIGBUS
170 /** These are caused by ImageStreamIO server resets.
171 */
172 int setSigSegvHandler();
173
174 /// The handler called when SIGSEGV or SIGBUS is received, which will be due to ImageStreamIO server resets. Just a wrapper for handlerSigSegv.
175 static void _handlerSigSegv(int signum,
177 void *ucont);
178
179 /// Handles SIGSEGV and SIGBUS. Sets m_restart to true.
180 void handlerSigSegv(int signum,
182 void *ucont);
183 ///@}
184
185 /** \name Framegrabber Thread
186 * This thread monitors the ImageStreamIO buffer and copies its images to the circular buffer.
187 *
188 * @{
189 */
190 int m_fgThreadPrio{1}; ///< Priority of the framegrabber thread, should normally be > 00.
191
192 std::string m_fgCpuset; ///< The cpuset for the framegrabber thread. Ignored if empty (the default).
193
194 std::thread m_fgThread; ///< A separate thread for the actual framegrabbings
195
196 bool m_fgThreadInit{true}; ///< Synchronizer to ensure f.g. thread initializes before doing dangerous things.
197
198 pid_t m_fgThreadID{0}; ///< F.g. thread PID.
199
200 pcf::IndiProperty m_fgThreadProp; ///< The property to hold the f.g. thread details.
201
202 /// Worker function to allocate the circular buffers.
203 /** This takes place in the fg thread after connecting to the stream.
204 *
205 * \returns 0 on sucess.
206 * \returns -1 on error.
207 */
208 int allocate_circbufs();
209
210 /// Worker function to configure and allocate the xrif handles.
211 /** This takes place in the fg thread after connecting to the stream.
212 *
213 * \returns 0 on sucess.
214 * \returns -1 on error.
215 */
216 int allocate_xrif();
217
218 /// Thread starter, called by fgThreadStart on thread construction. Calls fgThreadExec.
219 static void fgThreadStart(streamWriter *s /**< [in] a pointer to an streamWriter instance (normally this) */);
220
221 /// Execute the frame grabber main loop.
222 void fgThreadExec();
223
224 ///@}
225
226 /** \name Stream Writer Thread
227 * This thread writes chunks of the circular buffer to disk.
228 *
229 * @{
230 */
231 int m_swThreadPrio{1}; ///< Priority of the stream writer thread, should normally be > 0, and <= m_fgThreadPrio.
232
233 std::string m_swCpuset; ///< The cpuset for the framegrabber thread. Ignored if empty (the default).
234
235 sem_t m_swSemaphore; ///< Semaphore used to synchronize the fg thread and the sw thread.
236
237 std::thread m_swThread; ///< A separate thread for the actual writing
238
239 bool m_swThreadInit{true}; ///< Synchronizer to ensure s.w. thread initializes before doing dangerous things.
240
241 pid_t m_swThreadID{0}; ///< S.w. thread pid.
242
243 pcf::IndiProperty m_swThreadProp; ///< The property to hold the s.w. thread details.
244
245 size_t m_fnameSz{0};
246
247 char *m_fname{nullptr};
248
249 std::string m_fnameBase;
250
251 /// Thread starter, called by swThreadStart on thread construction. Calls swThreadExec.
252 static void swThreadStart(streamWriter *s /**< [in] a pointer to an streamWriter instance (normally this) */);
253
254 /// Execute the stream writer main loop.
255 void swThreadExec();
256
257 /// Function called when semaphore is raised to do the encode and write.
258 int doEncode();
259 ///@}
260
261 // INDI:
262protected:
263 // declare our properties
264 pcf::IndiProperty m_indiP_writing;
265
266 pcf::IndiProperty m_indiP_xrifStats;
267
268public:
270
271 void updateINDI();
272
273 /** \name Telemeter Interface
274 *
275 * @{
276 */
277 int checkRecordTimes();
278
279 int recordTelem(const telem_saving_state *);
280
281 int recordSavingState(bool force = false);
282 int recordSavingStats(bool force = false);
283
284 ///@}
285};
286
287// Set self pointer to null so app starts up uninitialized.
289
290streamWriter::streamWriter() : MagAOXApp(MAGAOX_CURRENT_SHA1, MAGAOX_REPO_MODIFIED)
291{
292 m_powerMgtEnabled = false;
293
294 m_selfWriter = this;
295
296 return;
297}
298
300{
301 if (m_xrif)
303
304 if (m_xrif_header)
306
307 if (m_xrif_timing)
309
312
313 return;
314}
315
317{
318 config.add("writer.savePath", "", "writer.savePath", argType::Required, "writer", "savePath", false, "string", "The absolute path where images are saved. Will use MagAO-X default if not set.");
319
320 config.add("writer.circBuffLength", "", "writer.circBuffLength", argType::Required, "writer", "circBuffLength", false, "size_t", "The length in frames of the circular buffer. Should be an integer multiple of and larger than writeChunkLength.");
321
322 config.add("writer.writeChunkLength", "", "writer.writeChunkLength", argType::Required, "writer", "writeChunkLength", false, "size_t", "The length in frames of the chunks to write to disk. Should be smaller than circBuffLength.");
323
324 config.add("writer.maxChunkTime", "", "writer.maxChunkTime", argType::Required, "writer", "maxChunkTime", false, "float", "The max length in seconds of the chunks to write to disk. Default is 60 sec.");
325
326 config.add("writer.threadPrio", "", "writer.threadPrio", argType::Required, "writer", "threadPrio", false, "int", "The real-time priority of the stream writer thread.");
327
328 config.add("writer.cpuset", "", "writer.cpuset", argType::Required, "writer", "cpuset", false, "int", "The cpuset for the writer thread.");
329
330 config.add("writer.compress", "", "writer.compress", argType::Required, "writer", "compress", false, "bool", "Flag to set whether compression is used. Default true.");
331
332 config.add("writer.lz4accel", "", "writer.lz4accel", argType::Required, "writer", "lz4accel", false, "int", "The LZ4 acceleration parameter. Larger is faster, but lower compression.");
333
334 config.add("writer.outName", "", "writer.outName", argType::Required, "writer", "outName", false, "int", "The name to use for output files. Default is the shmimName.");
335
336 config.add("framegrabber.shmimName", "", "framegrabber.shmimName", argType::Required, "framegrabber", "shmimName", false, "int", "The name of the stream to monitor. From /tmp/shmimName.im.shm.");
337
338 config.add("framegrabber.semaphoreNumber", "", "framegrabber.semaphoreNumber", argType::Required, "framegrabber", "semaphoreNumber", false, "int", "The semaphore to wait on. Default is 7.");
339
340 config.add("framegrabber.semWait", "", "framegrabber.semWait", argType::Required, "framegrabber", "semWait", false, "int", "The time in nsec to wait on the semaphore. Max is 999999999. Default is 5e8 nsec.");
341
342 config.add("framegrabber.threadPrio", "", "framegrabber.threadPrio", argType::Required, "framegrabber", "threadPrio", false, "int", "The real-time priority of the framegrabber thread.");
343
344 config.add("framegrabber.cpuset", "", "framegrabber.cpuset", argType::Required, "framegrabber", "cpuset", false, "string", "The cpuset for the framegrabber thread.");
345
347}
348
350{
351
352 config(m_circBuffLength, "writer.circBuffLength");
353 config(m_writeChunkLength, "writer.writeChunkLength");
354 config(m_maxChunkTime, "writer.maxChunkTime");
355 config(m_swThreadPrio, "writer.threadPrio");
356 config(m_swCpuset, "writer.cpuset");
357 config(m_compress, "writer.compress");
358 config(m_lz4accel, "writer.lz4accel");
363
364 config(m_shmimName, "framegrabber.shmimName");
365
367 config(m_outName, "writer.outName");
368
369 config(m_semaphoreNumber, "framegrabber.semaphoreNumber");
370 config(m_semWaitNSec, "framegrabber.semWait");
371
372 config(m_fgThreadPrio, "framegrabber.threadPrio");
373 config(m_fgCpuset, "framegrabber.cpuset");
374
375 // Set some defaults
376 // Setup default log path
378 config(m_rawimageDir, "writer.savePath");
379
380 if (telemeterT::loadConfig(config) < 0)
381 {
382 log<text_log>("Error during telemeter config", logPrio::LOG_CRITICAL);
383 m_shutdown = true;
384 }
385}
386
388{
389 // Create save directory.
390 errno = 0;
391 if (mkdir(m_rawimageDir.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH) < 0)
392 {
393 if (errno != EEXIST)
394 {
395 std::stringstream logss;
396 logss << "Failed to create image directory (" << m_rawimageDir << "). Errno says: " << strerror(errno);
398
399 return -1;
400 }
401 }
402
403 // set up the INDI properties
406
407 // Register the stats INDI property
408 REG_INDI_NEWPROP_NOCB(m_indiP_xrifStats, "xrif", pcf::IndiProperty::Number);
409 m_indiP_xrifStats.setLabel("xrif compression performance");
410
411 indi::addNumberElement<float>(m_indiP_xrifStats, "ratio", 0, 1.0, 0.0, "%0.2f", "Compression Ratio");
412
413 indi::addNumberElement<float>(m_indiP_xrifStats, "differenceMBsec", 0, std::numeric_limits<float>::max(), 0.0, "%0.2f", "Differencing Rate [MB/sec]");
414
415 indi::addNumberElement<float>(m_indiP_xrifStats, "reorderMBsec", 0, std::numeric_limits<float>::max(), 0.0, "%0.2f", "Reordering Rate [MB/sec]");
416
417 indi::addNumberElement<float>(m_indiP_xrifStats, "compressMBsec", 0, std::numeric_limits<float>::max(), 0.0, "%0.2f", "Compression Rate [MB/sec]");
418
419 indi::addNumberElement<float>(m_indiP_xrifStats, "encodeMBsec", 0, std::numeric_limits<float>::max(), 0.0, "%0.2f", "Total Encoding Rate [MB/sec]");
420
421 indi::addNumberElement<float>(m_indiP_xrifStats, "differenceFPS", 0, std::numeric_limits<float>::max(), 0.0, "%0.2f", "Differencing Rate [f.p.s.]");
422
423 indi::addNumberElement<float>(m_indiP_xrifStats, "reorderFPS", 0, std::numeric_limits<float>::max(), 0.0, "%0.2f", "Reordering Rate [f.p.s.]");
424
425 indi::addNumberElement<float>(m_indiP_xrifStats, "compressFPS", 0, std::numeric_limits<float>::max(), 0.0, "%0.2f", "Compression Rate [f.p.s.]");
426
427 indi::addNumberElement<float>(m_indiP_xrifStats, "encodeFPS", 0, std::numeric_limits<float>::max(), 0.0, "%0.2f", "Total Encoding Rate [f.p.s.]");
428
429 // Now set up the framegrabber and writer threads.
430 // - need SIGSEGV and SIGBUS handling for ImageStreamIO restarts
431 // - initialize the semaphore
432 // - start the threads
433
434 if (setSigSegvHandler() < 0)
435 return log<software_error, -1>({__FILE__, __LINE__});
436
437 if (sem_init(&m_swSemaphore, 0, 0) < 0)
438 return log<software_critical, -1>({__FILE__, __LINE__, errno, 0, "Initializing S.W. semaphore"});
439
440 // Check if we have a safe writeChunkLengthh
442 {
443 return log<software_critical, -1>({__FILE__, __LINE__, "Write chunk length is not a divisor of circular buffer length."});
444 }
445
446 if (initialize_xrif() < 0)
448
450 {
451 return log<software_critical, -1>({__FILE__, __LINE__});
452 }
453
455 {
457 }
458
459 if (telemeterT::appStartup() < 0)
460 {
461 return log<software_error, -1>({__FILE__, __LINE__});
462 }
463
464 return 0;
465}
466
468{
469
470 // first do a join check to see if other threads have exited.
471 // these will throw if the threads are really gone
472 try
473 {
474 if (pthread_tryjoin_np(m_fgThread.native_handle(), 0) == 0)
475 {
476 log<software_error>({__FILE__, __LINE__, "framegrabber thread has exited"});
477 return -1;
478 }
479 }
480 catch (...)
481 {
482 log<software_error>({__FILE__, __LINE__, "framegrabber thread has exited"});
483 return -1;
484 }
485
486 try
487 {
488 if (pthread_tryjoin_np(m_swThread.native_handle(), 0) == 0)
489 {
490 log<software_error>({__FILE__, __LINE__, "stream thread has exited"});
491 return -1;
492 }
493 }
494 catch (...)
495 {
496 log<software_error>({__FILE__, __LINE__, "streamwriter thread has exited"});
497 return -1;
498 }
499
500 switch (m_writing)
501 {
502 case NOT_WRITING:
504 break;
505 default:
507 }
508
510 {
511 if (telemeterT::appLogic() < 0)
512 {
514 return 0;
515 }
516 }
517
518 updateINDI();
519
520 return 0;
521}
522
524{
525 try
526 {
527 if (m_fgThread.joinable())
528 {
529 m_fgThread.join();
530 }
531 }
532 catch (...)
533 {
534 }
535
536 try
537 {
538 if (m_swThread.joinable())
539 {
540 m_swThread.join();
541 }
542 }
543 catch (...)
544 {
545 }
546
547 if (m_xrif)
548 {
550 m_xrif = nullptr;
551 }
552
553 if (m_xrif_timing)
554 {
556 m_xrif_timing = nullptr;
557 }
558
560
561 return 0;
562}
563
565{
567 if (rv != XRIF_NOERROR)
568 {
569 return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif handle allocation or initialization error."});
570 }
571
572 if (m_compress)
573 {
575 if (rv != XRIF_NOERROR)
576 {
577 return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif handle configuration error."});
578 }
579 }
580 else
581 {
582 std::cerr << "not compressing . . . \n";
584 if (rv != XRIF_NOERROR)
585 {
586 return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif handle configuration error."});
587 }
588 }
589
590 errno = 0;
591 m_xrif_header = (char *)malloc(XRIF_HEADER_SIZE * sizeof(char));
592 if (m_xrif_header == NULL)
593 {
594 return log<software_critical, -1>({__FILE__, __LINE__, errno, 0, "xrif header allocation failed."});
595 }
596
598 if (rv != XRIF_NOERROR)
599 {
600 return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif handle allocation or initialization error."});
601 }
602
603 // m_xrif_timing->reorder_method = XRIF_REORDER_NONE;
605 if (rv != XRIF_NOERROR)
606 {
607 return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif handle configuration error."});
608 }
609
610 errno = 0;
611 m_xrif_timing_header = (char *)malloc(XRIF_HEADER_SIZE * sizeof(char));
613 {
614 return log<software_critical, -1>({__FILE__, __LINE__, errno, 0, "xrif header allocation failed."});
615 }
616
617 return 0;
618}
619
621{
622 struct sigaction act;
623 sigset_t set;
624
625 act.sa_sigaction = &streamWriter::_handlerSigSegv;
626 act.sa_flags = SA_SIGINFO;
627 sigemptyset(&set);
628 act.sa_mask = set;
629
630 errno = 0;
631 if (sigaction(SIGSEGV, &act, 0) < 0)
632 {
633 std::string logss = "Setting handler for SIGSEGV failed. Errno says: ";
634 logss += strerror(errno);
635
637
638 return -1;
639 }
640
641 errno = 0;
642 if (sigaction(SIGBUS, &act, 0) < 0)
643 {
644 std::string logss = "Setting handler for SIGBUS failed. Errno says: ";
645 logss += strerror(errno);
646
648
649 return -1;
650 }
651
652 log<text_log>("Installed SIGSEGV/SIGBUS signal handler.", logPrio::LOG_DEBUG);
653
654 return 0;
655}
656
658 siginfo_t *siginf,
659 void *ucont)
660{
662}
663
665 siginfo_t *siginf,
666 void *ucont)
667{
668 static_cast<void>(signum);
669 static_cast<void>(siginf);
670 static_cast<void>(ucont);
671
672 m_restart = true;
673
674 return;
675}
676
678{
680 {
682 }
683
684 errno = 0;
686
688 {
689 return log<software_critical, -1>({__FILE__, __LINE__, errno, 0, "buffer allocation failure"});
690 }
691
693 {
695 }
696
697 errno = 0;
699 if (m_timingCircBuff == NULL)
700 {
701 return log<software_critical, -1>({__FILE__, __LINE__, errno, 0, "buffer allocation failure"});
702 }
703
704 return 0;
705}
706
708{
709 // Set up the image data xrif handle
711
712 if (m_compress)
713 {
715 if (rv != XRIF_NOERROR)
716 {
717 return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif handle configuration error."});
718 }
719 }
720 else
721 {
722 std::cerr << "not compressing . . . \n";
724 if (rv != XRIF_NOERROR)
725 {
726 return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif handle configuration error."});
727 }
728 }
729
731 if (rv != XRIF_NOERROR)
732 {
733 return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif_set_size error."});
734 }
735
737 if (rv != XRIF_NOERROR)
738 {
739 return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif_allocate_raw error."});
740 }
741
743 if (rv != XRIF_NOERROR)
744 {
745 return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif_allocate_reordered error."});
746 }
747
748 // Set up the timing data xrif handle
750 if (rv != XRIF_NOERROR)
751 {
752 return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif handle configuration error."});
753 }
754
756 if (rv != XRIF_NOERROR)
757 {
758 return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif_set_size error."});
759 }
760
762 if (rv != XRIF_NOERROR)
763 {
764 return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif_allocate_raw error."});
765 }
766
768 if (rv != XRIF_NOERROR)
769 {
770 return log<software_critical, -1>({__FILE__, __LINE__, 0, rv, "xrif_allocate_reordered error."});
771 }
772
773 return 0;
774}
775
777{
778 o->fgThreadExec();
779}
780
782{
784
785 // Wait fpr the thread starter to finish initializing this thread.
786 while (m_fgThreadInit == true && m_shutdown == 0)
787 {
788 sleep(1);
789 }
790
791 timespec missing_ts;
792
793 IMAGE image;
794 ino_t inode = 0; // The inode of the image stream file
795
796 bool opened = false;
797
798 while (m_shutdown == 0)
799 {
800 /* Initialize ImageStreamIO
801 */
802 opened = false;
803 m_restart = false; // Set this up front, since we're about to restart.
804
805 sem_t *sem{nullptr}; ///< The semaphore to monitor for new image data
806
807 int logged = 0;
808 while (!opened && !m_shutdown && !m_restart)
809 {
810 // b/c ImageStreamIO prints every single time, and latest version don't support stopping it yet, and that isn't thread-safe-able anyway
811 // we do our own checks. This is the same code in ImageStreamIO_openIm...
812 int SM_fd;
813 char SM_fname[200];
815 SM_fd = open(SM_fname, O_RDWR);
816 if (SM_fd == -1)
817 {
818 if (!logged)
819 log<text_log>("ImageStream " + m_shmimName + " not found (yet). Retrying . . .", logPrio::LOG_NOTICE);
820 logged = 1;
821 sleep(1); // be patient
822 continue;
823 }
824
825 // Found and opened, close it and then use ImageStreamIO
826 logged = 0;
827 close(SM_fd);
828
829 if (ImageStreamIO_openIm(&image, m_shmimName.c_str()) == 0)
830 {
831 if (image.md[0].sem <= m_semaphoreNumber) ///<\todo this isn't right--> isn't there a define in cacao to use?
832 {
834 mx::sys::sleep(1); // We just need to wait for the server process to finish startup.
835 }
836 else
837 {
838 opened = true;
839
840 char SM_fname[200];
842
843 struct stat buffer;
844 int rv = stat(SM_fname, &buffer);
845
846 if (rv != 0)
847 {
848 log<software_critical>({__FILE__, __LINE__, errno, "Could not get inode for " + m_shmimName + ". Source process will need to be restarted."});
850 return;
851 }
852 inode = buffer.st_ino;
853 }
854 }
855 else
856 {
857 mx::sys::sleep(1); // be patient
858 }
859 }
860
861 if (m_restart)
862 continue; // this is kinda dumb. we just go around on restart, so why test in the while loop at all?
863
864 if (m_shutdown || !opened)
865 {
866 if (!opened)
867 return;
868
870 return;
871 }
872
873 // now get a good semaphore
874 m_semaphoreNumber = ImageStreamIO_getsemwaitindex(&image, m_semaphoreNumber); // ask for semaphore we had before
875
876 if (m_semaphoreNumber < 0)
877 {
878 log<software_critical>({__FILE__, __LINE__, "No valid semaphore found for " + m_shmimName + ". Source process will need to be restarted."});
879 return;
880 }
881
882 log<software_info>({__FILE__, __LINE__, "got semaphore index " + std::to_string(m_semaphoreNumber) + " for " + m_shmimName});
883
885
886 sem = image.semptr[m_semaphoreNumber]; ///< The semaphore to monitor for new image data
887
888 m_dataType = image.md[0].datatype;
890 m_width = image.md[0].size[0];
891 m_height = image.md[0].size[1];
892 size_t length;
893 if (image.md[0].naxis == 3)
894 {
895 length = image.md[0].size[2];
896 }
897 else
898 {
899 length = 1;
900 }
901 std::cerr << "connected"
902 << " " << m_width << "x" << m_height << "x" << (int)m_dataType << " (" << m_typeSize << ")" << std::endl;
903
904 // Now allocate the circBuffs
905 if (allocate_circbufs() < 0)
906 return; // will cause shutdown!
907
908 // And allocate the xrifs
909 if (allocate_xrif() < 0)
910 return; // Will cause shutdown!
911
913 size_t snx, sny, snz;
914
915 uint64_t curr_image; // The current cnt1 index
916 m_currImage = 0;
919
920 // Initialized curr_image ...
921 if (image.md[0].naxis > 2)
922 {
923 curr_image = image.md[0].cnt1;
924 }
925 else
926 {
927 curr_image = 0;
928 }
929
930 uint64_t last_cnt0; // = ((uint64_t)-1);
931
932 // so we can initialize last_cnt0 to avoid frame skip on startup
933 if (image.cntarray)
934 {
935 last_cnt0 = image.cntarray[curr_image];
936 }
937 else
938 {
939 last_cnt0 = image.md[0].cnt0;
940 }
941
942 int cnt0flag = 0;
943
944 bool restartWriting = false; // flag to prevent logging on a logging restart
945
946 // This is the main image grabbing loop.
947 while (!m_shutdown && !m_restart)
948 {
949 timespec ts;
951
952 if(sem_timedwait(sem, &ts) == 0)
953 {
954 if (image.md[0].naxis > 2)
955 {
956 curr_image = image.md[0].cnt1;
957 }
958 else
959 {
960 curr_image = 0;
961 }
962
963 atype = image.md[0].datatype;
964 snx = image.md[0].size[0];
965 sny = image.md[0].size[1];
966 if (image.md[0].naxis == 3)
967 {
968 snz = image.md[0].size[2];
969 }
970 else
971 {
972 snz = 1;
973 }
974
975 if (atype != m_dataType || snx != m_width || sny != m_height || snz != length)
976 {
977 break; // exit the nearest while loop and get the new image setup.
978 }
979
980 if (m_shutdown || m_restart)
981 {
982 break; // Check for exit signals
983 }
984
986 if (image.cntarray)
987 {
988 new_cnt0 = image.cntarray[curr_image];
989 }
990 else
991 {
992 new_cnt0 = image.md[0].cnt0;
993 }
994
995 #ifdef SW_DEBUG
996 std::cerr << "new_cnt0: " << new_cnt0 << "\n";
997 #endif
998
999 ///\todo cleanup skip frame handling.
1000 if (new_cnt0 == last_cnt0) //<- this probably isn't useful really
1001 {
1002 log<text_log>("semaphore raised but cnt0 has not changed -- we're probably getting behind", logPrio::LOG_WARNING);
1003 ++cnt0flag;
1004 if (cnt0flag > 10)
1005 {
1006 m_restart = true; // if we get here 10 times then something else is wrong.
1007 }
1008 continue;
1009 }
1010
1011 if (new_cnt0 - last_cnt0 > 1) //<- this is what we want to check.
1012 {
1013 log<text_log>("cnt0 changed by more than 1. Frame skipped.", logPrio::LOG_WARNING);
1014 }
1015
1016 cnt0flag = 0;
1017
1019
1021 char *curr_src = (char *)image.array.raw + curr_image * m_width * m_height * m_typeSize;
1022
1024
1026
1027 if (image.cntarray)
1028 {
1029 curr_timing[0] = image.cntarray[curr_image];
1030 curr_timing[1] = image.atimearray[curr_image].tv_sec;
1031 curr_timing[2] = image.atimearray[curr_image].tv_nsec;
1032 curr_timing[3] = image.writetimearray[curr_image].tv_sec;
1033 curr_timing[4] = image.writetimearray[curr_image].tv_nsec;
1034 }
1035 else
1036 {
1037 curr_timing[0] = image.md[0].cnt0;
1038 curr_timing[1] = image.md[0].atime.tv_sec;
1039 curr_timing[2] = image.md[0].atime.tv_nsec;
1040 curr_timing[3] = image.md[0].writetime.tv_sec;
1041 curr_timing[4] = image.md[0].writetime.tv_nsec;
1042 }
1043
1044 // Check if we need to time-stamp ourselves -- for old cacao streams
1045 if (curr_timing[1] == 0)
1046 {
1047
1049 {
1050 log<software_critical>({__FILE__, __LINE__, errno, 0, "clock_gettime"});
1051 return;
1052 }
1053
1054 curr_timing[1] = missing_ts.tv_sec;
1055 curr_timing[2] = missing_ts.tv_nsec;
1056 }
1057
1058 // just set w-time to a-time if it's missing
1059 if (curr_timing[3] == 0)
1060 {
1061 curr_timing[3] = curr_timing[1];
1062 curr_timing[4] = curr_timing[2];
1063 }
1064
1065 m_currImageTime = 1.0 * curr_timing[3] + (1.0 * curr_timing[4]) / 1e9;
1066
1067 if (m_shutdown && m_writing == WRITING)
1068 {
1070 }
1071
1072 switch (m_writing)
1073 {
1074 case START_WRITING:
1075
1079
1080 if(!restartWriting) //We only log if this is really a start
1081 {
1083 }
1084 else //on a restart after a timeout we don't log
1085 {
1086 restartWriting = false;
1087 }
1088
1090
1091 // fall through
1092 case WRITING:
1094 {
1098
1099 #ifdef SW_DEBUG
1100 std::cerr << __FILE__ << " " << __LINE__ << " WRITING " << m_currImage << " "
1101 << m_nextChunkStart << " "
1104 << new_cnt0 << "\n";
1105 #endif
1106
1107 // Now tell the writer to get going
1108 if (sem_post(&m_swSemaphore) < 0)
1109 {
1110 log<software_critical>({__FILE__, __LINE__, errno, 0, "Error posting to semaphore"});
1111 return;
1112 }
1113
1116 {
1117 m_nextChunkStart = 0;
1118 }
1119
1122 }
1124 {
1128
1129 #ifdef SW_DEBUG
1130 std::cerr << __FILE__ << " " << __LINE__ << " IMAGE TIME WRITING " << m_currImage << " "
1131 << m_nextChunkStart << " "
1134 << new_cnt0 << "\n";
1135 #endif
1136
1137 // Now tell the writer to get going
1138 if (sem_post(&m_swSemaphore) < 0)
1139 {
1140 log<software_critical>({__FILE__, __LINE__, errno, 0, "Error posting to semaphore"});
1141 return;
1142 }
1143
1145 restartWriting = true;
1146
1147 }
1148 break;
1149
1150 case STOP_WRITING:
1154
1155 #ifdef SW_DEBUG
1156 std::cerr << __FILE__ << " " << __LINE__ << " STOP_WRITING\n";
1157 #endif
1158
1159 // Now tell the writer to get going
1160 if (sem_post(&m_swSemaphore) < 0)
1161 {
1162 log<software_critical>({__FILE__, __LINE__, errno, 0, "Error posting to semaphore"});
1163 return;
1164 }
1165 restartWriting = false;
1166 break;
1167
1168 default:
1169 break;
1170 }
1171
1172 ++m_currImage;
1174 {
1175 m_currImage = 0;
1176 }
1177 }
1178 else
1179 {
1180 // If semaphore times-out or errors, we first cleanup any writing that needs to be done
1181 //we can also get here if a signal interrupts the sem wait which is triggered by INDI callbacks
1182 switch (m_writing)
1183 {
1184 case WRITING:
1185 // Here, if there is at least 1 image, we check for delta-time > m_maxChunkTime
1186 // then write
1187 if ((m_currImage - m_nextChunkStart > 0) && (mx::sys::get_curr_time() - m_currChunkStartTime > m_maxChunkTime))
1188 {
1192
1193 #ifdef SW_DEBUG
1194 std::cerr << __FILE__ << " " << __LINE__ << " TIMEOUT WRITING " << " "
1195 << m_currImage << " " << m_nextChunkStart << " " <<(m_currImage - m_nextChunkStart) << " "
1196 << last_cnt0 << "\n";
1197 #endif
1198
1199 // Now tell the writer to get going
1200 if (sem_post(&m_swSemaphore) < 0)
1201 {
1202 log<software_critical>({__FILE__, __LINE__, errno, 0, "Error posting to semaphore"});
1203 return;
1204 }
1205
1207 restartWriting = true;
1208
1209 }
1210 break;
1211 case STOP_WRITING:
1212 // If we timed-out while STOP_WRITING is set, we trigger a write.
1216
1217 #ifdef SW_DEBUG
1218 std::cerr << __FILE__ << " " << __LINE__ << " TIMEOUT STOP_WRITING\n";
1219 #endif
1220
1221 // Now tell the writer to get going
1222 if (sem_post(&m_swSemaphore) < 0)
1223 {
1224 log<software_critical>({__FILE__, __LINE__, errno, 0, "Error posting to semaphore"});
1225 return;
1226 }
1227 restartWriting = false;
1228 break;
1229 default:
1230 break;
1231 }
1232
1233 if (image.md[0].sem <= 0)
1234 {
1235 break; // Indicates that the server has cleaned up.
1236 }
1237
1238 // Check for why we timed out
1239 if (errno == EINTR)
1240 {
1241 break; // This will indicate time to shutdown, loop will exit normally flags set.
1242 }
1243
1244 // ETIMEDOUT just means we should wait more.
1245 // Otherwise, report an error.
1246 if (errno != ETIMEDOUT)
1247 {
1248 log<software_error>({__FILE__, __LINE__, errno, "sem_timedwait"});
1249 break;
1250 }
1251
1252 // Check if the file has disappeared.
1253 int SM_fd;
1254 char SM_fname[200];
1256 SM_fd = open(SM_fname, O_RDWR);
1257 if (SM_fd == -1)
1258 {
1259 m_restart = true;
1260 }
1261 close(SM_fd);
1262
1263 // Check if the inode changed
1264 struct stat buffer;
1265 int rv = stat(SM_fname, &buffer);
1266 if (rv != 0)
1267 {
1268 m_restart = true;
1269 }
1270
1271 if (buffer.st_ino != inode)
1272 {
1273 #ifdef SW_DEBUG
1274 std::cerr << "Restarting due to inode . . . \n";
1275 #endif
1276 m_restart = true;
1277 }
1278 }
1279 }
1280
1281 ///\todo might still be writing here, so must check
1282 // If semaphore times-out or errors, we first cleanup any writing that needs to be done
1284 {
1285 // Here, if there is at least 1 image, then write
1286 if ((m_currImage - m_nextChunkStart > 0))
1287 {
1291
1293
1294 std::cerr << __FILE__ << " " << __LINE__ << " WRITING ON RESTART " << last_cnt0 << "\n";
1295 // Now tell the writer to get going
1296 if (sem_post(&m_swSemaphore) < 0)
1297 {
1298 log<software_critical>({__FILE__, __LINE__, errno, 0, "Error posting to semaphore"});
1299 return;
1300 }
1301 }
1302 else
1303 {
1305 }
1306
1307
1308 while(m_writing != NOT_WRITING)
1309 {
1310 std::cerr << __FILE__ << " " << __LINE__ << " WAITING TO FINISH WRITING " << last_cnt0 << "\n";
1311 sleep(1);
1312 }
1313 }
1314
1316 {
1319 }
1320
1321 if (m_timingCircBuff)
1322 {
1324 m_timingCircBuff = 0;
1325 }
1326
1327 if (opened)
1328 {
1329 if (m_semaphoreNumber >= 0)
1330 {
1331 ///\todo is this release necessary with closeIM?
1332 image.semReadPID[m_semaphoreNumber] = 0; // release semaphore
1333 }
1335 opened = false;
1336 }
1337
1338 } // outer loop, will exit if m_shutdown==true
1339
1340 // One more check
1342 {
1345 }
1346
1347 if (m_timingCircBuff)
1348 {
1350 m_timingCircBuff = 0;
1351 }
1352
1353 if (opened)
1354 {
1355 if (m_semaphoreNumber >= 0)
1356 {
1357 ///\todo is this release necessary with closeIM?
1358 image.semReadPID[m_semaphoreNumber] = 0; // release semaphore.
1359 }
1360
1362 }
1363}
1364
1366{
1367 s->swThreadExec();
1368}
1369
1371{
1373
1374 // Wait fpr the thread starter to finish initializing this thread.
1375 while (m_swThreadInit == true && m_shutdown == 0)
1376 {
1377 sleep(1);
1378 }
1379
1380 while (!m_shutdown)
1381 {
1382 while (!shutdown() && (!(state() == stateCodes::READY || state() == stateCodes::OPERATING)))
1383 {
1384 if (m_fname)
1385 {
1386 free(m_fname);
1387 m_fname = nullptr;
1388 }
1389 sleep(1);
1390 }
1391
1392 if (shutdown())
1393 {
1394 break;
1395 }
1396
1397 // This will happen after a reconnection, and could update m_shmimName, etc.
1398 if (m_fname == nullptr)
1399 {
1400 m_fnameBase = m_rawimageDir + "/" + m_outName + "_";
1401
1402 m_fnameSz = m_fnameBase.size() + sizeof("YYYYMMDDHHMMSSNNNNNNNNN.xrif"); // the sizeof includes the \0
1403 m_fname = (char *)malloc(m_fnameSz);
1404
1405 snprintf(m_fname, m_fnameSz, "%sYYYYMMDDHHMMSSNNNNNNNNN.xrif", m_fnameBase.c_str());
1406 }
1407
1408 // at this point fname is not null.
1409
1410 timespec ts;
1411
1412 if (clock_gettime(CLOCK_REALTIME, &ts) < 0)
1413 {
1414 log<software_critical>({__FILE__, __LINE__, errno, 0, "clock_gettime"});
1415
1416 free(m_fname);
1417 m_fname = nullptr;
1418
1419 return; // will trigger a shutdown
1420 }
1421
1422 mx::sys::timespecAddNsec(ts, m_semWaitNSec);
1423
1424 if (sem_timedwait(&m_swSemaphore, &ts) == 0)
1425 {
1426 if (doEncode() < 0)
1427 {
1428 log<software_critical>({__FILE__, __LINE__, "error encoding data"});
1429 return;
1430 }
1431 // Otherwise, success, and we just go on.
1432 }
1433 else
1434 {
1435 // Check for why we timed out
1436 if (errno == EINTR)
1437 {
1438 continue; // This will probably indicate time to shutdown, loop will exit normally if flags set.
1439 }
1440
1441 // ETIMEDOUT just means we should wait more.
1442 // Otherwise, report an error.
1443 if (errno != ETIMEDOUT)
1444 {
1445 log<software_error>({__FILE__, __LINE__, errno, "sem_timedwait"});
1446 break;
1447 }
1448 }
1449 } // outer loop, will exit if m_shutdown==true
1450
1451 if (m_fname)
1452 {
1453 free(m_fname);
1454 m_fname = nullptr;
1455 }
1456}
1457
1459{
1460 if (m_writing == NOT_WRITING)
1461 {
1462 return 0;
1463 }
1464
1465 recordSavingState(true);
1466
1467 // Record these to prevent a change in other thread
1470 size_t nFrames = m_currSaveStop - saveStart;
1471 size_t nBytes = m_width * m_height * m_typeSize;
1472
1473 #ifdef SW_DEBUG
1474 std::cerr << "nFrames: " << nFrames << "\n";
1475 #endif
1476
1477 // Configure xrif and copy image data -- this does no allocations
1479 if (rv != XRIF_NOERROR)
1480 {
1481 // This is a big problem. Report it as "ALERT" and go on.
1482 log<software_alert>({__FILE__, __LINE__, 0, rv, "xrif set size error. DATA POSSIBLY LOST"});
1483 }
1484
1486 if (rv != XRIF_NOERROR)
1487 {
1488 // This may just be out of range, it's only an error.
1489 log<software_error>({__FILE__, __LINE__, 0, rv, "xrif set LZ4 acceleration error."});
1490 }
1491
1493
1494 // Configure xrif and copy timing data -- no allocations
1496 if (rv != XRIF_NOERROR)
1497 {
1498 // This is a big problem. Report it as "ALERT" and go on.
1499 log<software_alert>({__FILE__, __LINE__, 0, rv, "xrif set size error. DATA POSSIBLY LOST."});
1500 }
1501
1503 if (rv != XRIF_NOERROR)
1504 {
1505 // This may just be out of range, it's only an error.
1506 log<software_error>({__FILE__, __LINE__, 0, rv, "xrif set LZ4 acceleration error."});
1507 }
1508
1509 #ifdef SW_DEBUG
1510 for (size_t nF = 0; nF < nFrames; ++nF)
1511 {
1512 std::cerr << " " << (m_timingCircBuff + saveStart * 5 + nF * 5)[0] << "\n";
1513 }
1514 #endif
1515
1516 memcpy(m_xrif_timing->raw_buffer, m_timingCircBuff + saveStart * 5, nFrames * 5 * sizeof(uint64_t));
1517
1519 if (rv != XRIF_NOERROR)
1520 {
1521 // This is a big problem. Report it as "ALERT" and go on.
1522 log<software_alert>({__FILE__, __LINE__, 0, rv, "xrif encode error. DATA POSSIBLY LOST."});
1523 }
1524
1526 if (rv != XRIF_NOERROR)
1527 {
1528 // This is a big problem. Report it as "ALERT" and go on.
1529 log<software_alert>({__FILE__, __LINE__, 0, rv, "xrif write header error. DATA POSSIBLY LOST."});
1530 }
1531
1533 if (rv != XRIF_NOERROR)
1534 {
1535 // This is a big problem. Report it as "ALERT" and go on.
1536 log<software_alert>({__FILE__, __LINE__, 0, rv, "xrif encode error. DATA POSSIBLY LOST."});
1537 }
1538
1540 if (rv != XRIF_NOERROR)
1541 {
1542 // This is a big problem. Report it as "ALERT" and go on.
1543 log<software_alert>({__FILE__, __LINE__, 0, rv, "xrif write header error. DATA POSSIBLY LOST"});
1544 }
1545
1546 // Now break down the acq time of the first image in the buffer for use in file name
1547 tm uttime; // The broken down time.
1548 timespec *fts = (timespec *)(m_timingCircBuff + saveStart * 5 + 1);
1549
1550 if (gmtime_r(&fts->tv_sec, &uttime) == 0)
1551 {
1552 // Yell at operator but keep going
1553 log<software_alert>({__FILE__, __LINE__, errno, 0, "gmtime_r error. possible loss of timing information."});
1554 }
1555
1556 // Available size = m_fnameSz-m_fnameBase.size(), rather than assuming sizeof("YYYYMMDDHHMMSSNNNNNNNNN"), in case we screwed up somewhere.
1557 rv = snprintf(m_fname + m_fnameBase.size(), m_fnameSz - m_fnameBase.size(), "%04i%02i%02i%02i%02i%02i%09i", uttime.tm_year + 1900,
1558 uttime.tm_mon + 1, uttime.tm_mday, uttime.tm_hour, uttime.tm_min, uttime.tm_sec, static_cast<int>(fts->tv_nsec));
1559
1560 if (rv != sizeof("YYYYMMDDHHMMSSNNNNNNNNN") - 1)
1561 {
1562 // Something is very wrong. Keep going to try to get it on disk.
1563 log<software_alert>({__FILE__, __LINE__, errno, rv, "did not write enough chars to timestamp"});
1564 }
1565
1566 // Cover up the \0 inserted by snprintf
1567 (m_fname + m_fnameBase.size())[23] = '.';
1568
1569 FILE *fp_xrif = fopen(m_fname, "wb");
1570 if (fp_xrif == NULL)
1571 {
1572 // This is it. If we can't write data to disk need to fix.
1573 log<software_alert>({__FILE__, __LINE__, errno, 0, "failed to open file for writing"});
1574
1575 free(m_fname);
1576 m_fname = nullptr;
1577
1578 return -1; // will trigger a shutdown
1579 }
1580
1582
1583 if (bw != XRIF_HEADER_SIZE)
1584 {
1585 log<software_alert>({__FILE__, __LINE__, errno, 0, "failure writing header to file. DATA LOSS LIKELY. bytes = " + std::to_string(bw)});
1586 // We go on . . .
1587 }
1588
1589 bw = fwrite(m_xrif->raw_buffer, sizeof(uint8_t), m_xrif->compressed_size, fp_xrif);
1590
1591 if (bw != m_xrif->compressed_size)
1592 {
1593 log<software_alert>({__FILE__, __LINE__, errno, 0, "failure writing data to file. DATA LOSS LIKELY. bytes = " + std::to_string(bw)});
1594 }
1595
1597
1598 if (bw != XRIF_HEADER_SIZE)
1599 {
1600 log<software_alert>({__FILE__, __LINE__, errno, 0, "failure writing timing header to file. DATA LOSS LIKELY. bytes = " + std::to_string(bw)});
1601 }
1602
1603 bw = fwrite(m_xrif_timing->raw_buffer, sizeof(uint8_t), m_xrif_timing->compressed_size, fp_xrif);
1604
1605 if (bw != m_xrif_timing->compressed_size)
1606 {
1607 log<software_alert>({__FILE__, __LINE__, errno, 0, "failure writing timing data to file. DATA LOSS LIKELY. bytes = " + std::to_string(bw)});
1608 }
1609
1610 fclose(fp_xrif);
1611
1612 recordSavingStats(true);
1613
1614 if (m_writing == STOP_WRITING)
1615 {
1618 }
1619
1620 recordSavingState(true);
1621
1622 return 0;
1623
1624} // doEncode
1625
1626INDI_NEWCALLBACK_DEFN(streamWriter, m_indiP_writing)
1627(const pcf::IndiProperty &ipRecv)
1628{
1629 INDI_VALIDATE_CALLBACK_PROPS(m_indiP_writing, ipRecv);
1630
1631 if (!ipRecv.find("toggle"))
1632 {
1633 return 0;
1634 }
1635
1636 if (ipRecv["toggle"].getSwitchState() == pcf::IndiElement::Off && (m_writing == WRITING || m_writing == START_WRITING))
1637 {
1638 m_writing = STOP_WRITING;
1639 }
1640
1641 if (ipRecv["toggle"].getSwitchState() == pcf::IndiElement::On && m_writing == NOT_WRITING)
1642 {
1643 m_writing = START_WRITING;
1644 }
1645
1646 return 0;
1647}
1648
1650{
1651 // Only update this if not changing
1653 {
1654 if (m_xrif && m_writing == WRITING)
1655 {
1656 indi::updateSwitchIfChanged(m_indiP_writing, "toggle", pcf::IndiElement::On, m_indiDriver, INDI_OK);
1658 indi::updateIfChanged(m_indiP_xrifStats, "encodeMBsec", m_xrif->encode_rate / 1048576.0, m_indiDriver, INDI_BUSY);
1660 indi::updateIfChanged(m_indiP_xrifStats, "differenceMBsec", m_xrif->difference_rate / 1048576.0, m_indiDriver, INDI_BUSY);
1661 indi::updateIfChanged(m_indiP_xrifStats, "differenceFPS", m_xrif->difference_rate / (m_width * m_height * m_typeSize), m_indiDriver, INDI_BUSY);
1662 indi::updateIfChanged(m_indiP_xrifStats, "reorderMBsec", m_xrif->reorder_rate / 1048576.0, m_indiDriver, INDI_BUSY);
1664 indi::updateIfChanged(m_indiP_xrifStats, "compressMBsec", m_xrif->compress_rate / 1048576.0, m_indiDriver, INDI_BUSY);
1666 }
1667 else
1668 {
1669 indi::updateSwitchIfChanged(m_indiP_writing, "toggle", pcf::IndiElement::Off, m_indiDriver, INDI_OK);
1679 }
1680 }
1681}
1682
1687
1689{
1690 return recordSavingState(true);
1691}
1692
1694{
1695 static int16_t lastState = -1;
1696 static uint64_t currSaveStart = -1;
1697
1698 int16_t state;
1699 if (m_writing == WRITING || m_writing == START_WRITING || m_writing == STOP_WRITING) // Changed from just writing 5/2024
1700 state = 1;
1701 else
1702 state = 0;
1703
1705 {
1707
1708 lastState = state;
1710 }
1711
1712 return 0;
1713}
1714
1716{
1717 static uint32_t last_rawSize = -1;
1718 static uint32_t last_compressedSize = -1;
1719 static float last_encodeRate = -1;
1720 static float last_differenceRate = -1;
1721 static float last_reorderRate = -1;
1722 static float last_compressRate = -1;
1723
1724 if (m_xrif->raw_size != last_rawSize || m_xrif->compressed_size != last_compressedSize || m_xrif->encode_rate != last_encodeRate || m_xrif->difference_rate != last_differenceRate ||
1725 m_xrif->reorder_rate != last_reorderRate || m_xrif->compress_rate != last_compressRate || force)
1726 {
1727 telem<telem_saving>({(uint32_t)m_xrif->raw_size, (uint32_t)m_xrif->compressed_size, (float)m_xrif->encode_rate, (float)m_xrif->difference_rate, (float)m_xrif->reorder_rate, (float)m_xrif->compress_rate});
1728
1729 last_rawSize = m_xrif->raw_size;
1730 last_compressedSize = m_xrif->compressed_size;
1731 last_encodeRate = m_xrif->encode_rate;
1732 last_differenceRate = m_xrif->difference_rate;
1733 last_reorderRate = m_xrif->reorder_rate;
1734 last_compressRate = m_xrif->compress_rate;
1735 }
1736
1737 return 0;
1738}
1739
1740} // namespace app
1741} // namespace MagAOX
1742
1743#endif
The base-class for MagAO-X applications.
Definition MagAOXApp.hpp:73
stateCodes::stateCodeT state()
Get the current state code.
int registerIndiPropertyNew(pcf::IndiProperty &prop, int(*)(void *, const pcf::IndiProperty &))
Register an INDI property which is exposed for others to request a New Property for.
int createStandardIndiToggleSw(pcf::IndiProperty &prop, const std::string &name, const std::string &label="", const std::string &group="")
Create a standard R/W INDI switch with a single toggle element.
int m_shutdown
Flag to signal it's time to shutdown. When not 0, the main loop exits.
int shutdown()
Get the value of the shutdown flag.
indiDriver< MagAOXApp > * m_indiDriver
The INDI driver wrapper. Constructed and initialized by execute, which starts and stops communication...
static int log(const typename logT::messageT &msg, logPrioT level=logPrio::LOG_DEFAULT)
Make a log entry.
int threadStart(std::thread &thrd, bool &thrdInit, pid_t &tpid, pcf::IndiProperty &thProp, int thrdPrio, const std::string &cpuset, const std::string &thrdName, thisPtr *thrdThis, Function &&thrdStart)
Start a thread, using this class's privileges to set priority, etc.
std::string MagAOXPath
The base path of the MagAO-X system.
Definition MagAOXApp.hpp:81
pcf::IndiProperty m_indiP_xrifStats
pcf::IndiProperty m_indiP_writing
double m_currImageTime
The write-time of the current image.
std::thread m_fgThread
A separate thread for the actual framegrabbings.
int m_fgThreadPrio
Priority of the framegrabber thread, should normally be > 00.
pid_t m_fgThreadID
F.g. thread PID.
size_t m_circBuffLength
The length of the circular buffer, in frames.
int initialize_xrif()
Initialize the xrif system.
int recordTelem(const telem_saving_state *)
uint8_t m_dataType
The ImageStreamIO type code.
unsigned m_semWaitSec
The time in whole sec to wait on the semaphore, to which m_semWaitNSec is added. Default is 0 nsec.
bool m_fgThreadInit
Synchronizer to ensure f.g. thread initializes before doing dangerous things.
int m_typeSize
The pixel byte depth.
xrif_t m_xrif_timing
The xrif compression handle for image data.
uint64_t m_currSaveStart
The circular buffer position at which to start saving.
virtual int appStartup()
Startup functions.
size_t m_writeChunkLength
The number of frames to write at a time.
size_t m_height
The height of the image.
uint64_t m_currSaveStopFrameNo
The frame number of the image at which saving stopped (for logging)
double m_currChunkStartTime
The write-time of the first image in the chunk.
int allocate_circbufs()
Worker function to allocate the circular buffers.
void handlerSigSegv(int signum, siginfo_t *siginf, void *ucont)
Handles SIGSEGV and SIGBUS. Sets m_restart to true.
int m_swThreadPrio
Priority of the stream writer thread, should normally be > 0, and <= m_fgThreadPrio.
int recordSavingStats(bool force=false)
pcf::IndiProperty m_swThreadProp
The property to hold the s.w. thread details.
std::thread m_swThread
A separate thread for the actual writing.
std::string m_fgCpuset
The cpuset for the framegrabber thread. Ignored if empty (the default).
virtual int appLogic()
Implementation of the FSM for the Siglent SDG.
static void fgThreadStart(streamWriter *s)
Thread starter, called by fgThreadStart on thread construction. Calls fgThreadExec.
std::string m_outName
The name to use for outputting files, Default is m_shmimName.
INDI_NEWCALLBACK_DECL(streamWriter, m_indiP_writing)
char * m_xrif_timing_header
Storage for the xrif image data file header.
~streamWriter() noexcept
Destructor.
unsigned m_semWaitNSec
The time in nsec to wait on the semaphore, added to m_semWaitSec. Max is 999999999....
int allocate_xrif()
Worker function to configure and allocate the xrif handles.
int setSigSegvHandler()
Sets the handler for SIGSEGV and SIGBUS.
std::string m_swCpuset
The cpuset for the framegrabber thread. Ignored if empty (the default).
xrif_t m_xrif
The xrif compression handle for image data.
virtual void setupConfig()
Setup the configuration system (called by MagAOXApp::setup())
int m_semaphoreNumber
The image structure semaphore index.
std::string m_rawimageDir
The path where files will be saved.
static void _handlerSigSegv(int signum, siginfo_t *siginf, void *ucont)
The handler called when SIGSEGV or SIGBUS is received, which will be due to ImageStreamIO server rese...
int recordSavingState(bool force=false)
int m_writing
Controls whether or not images are being written, and sequences start and stop of writing.
uint64_t m_currChunkStart
The circular buffer starting position of the current to-be-written chunk.
pid_t m_swThreadID
S.w. thread pid.
virtual void loadConfig()
load the configuration system results (called by MagAOXApp::setup())
pcf::IndiProperty m_fgThreadProp
The property to hold the f.g. thread details.
void swThreadExec()
Execute the stream writer main loop.
uint64_t m_nextChunkStart
The circular buffer starting position of the next to-be-written chunk.
static streamWriter * m_selfWriter
Static pointer to this (set in constructor). Used for getting out of the static SIGSEGV handler.
char * m_xrif_header
Storage for the xrif image data file header.
double m_maxChunkTime
The maximum time before writing regardless of number of frames.
int doEncode()
Function called when semaphore is raised to do the encode and write.
static void swThreadStart(streamWriter *s)
Thread starter, called by swThreadStart on thread construction. Calls swThreadExec.
uint64_t m_currSaveStop
The circular buffer position at which to stop saving.
void fgThreadExec()
Execute the frame grabber main loop.
std::string m_shmimName
The name of the shared memory buffer.
dev::telemeter< streamWriter > telemeterT
size_t m_width
The width of the image.
virtual int appShutdown()
Do any needed shutdown tasks. Currently nothing in this app.
sem_t m_swSemaphore
Semaphore used to synchronize the fg thread and the sw thread.
bool m_swThreadInit
Synchronizer to ensure s.w. thread initializes before doing dangerous things.
#define MAGAOX_rawimageRelPath
The relative path to the raw images directory.
Definition paths.hpp:92
#define INDI_NEWCALLBACK_DEFN(class, prop)
Define the callback for a new property request.
#define REG_INDI_NEWPROP_NOCB(prop, propName, type)
Register a NEW INDI property with the class, with no callback.
#define INDI_NEWCALLBACK(prop)
Get the name of the static callback wrapper for a new property.
@ OPERATING
The device is operating, other than homing.
@ READY
The device is ready for operation, but is not operating.
#define INDI_VALIDATE_CALLBACK_PROPS(prop1, prop2)
Standard check for matching INDI properties in a callback.
#define INDI_IDLE
Definition indiUtils.hpp:28
#define INDI_BUSY
Definition indiUtils.hpp:30
#define INDI_OK
Definition indiUtils.hpp:29
void updateIfChanged(pcf::IndiProperty &p, const std::string &el, const T &newVal, indiDriverT *indiDriver, pcf::IndiProperty::PropertyStateType newState=pcf::IndiProperty::Ok)
Update the value of the INDI element, but only if it has changed.
Definition indiUtils.hpp:95
void updateSwitchIfChanged(pcf::IndiProperty &p, const std::string &el, const pcf::IndiElement::SwitchStateType &newVal, indiDriverT *indiDriver, pcf::IndiProperty::PropertyStateType newState=pcf::IndiProperty::Ok)
Update the value of the INDI element, but only if it has changed.
const pcf::IndiProperty & ipRecv
Definition dm.hpp:24
static constexpr logPrioT LOG_NOTICE
A normal but significant condition.
static constexpr logPrioT LOG_CRITICAL
The process can not continue and will shut down (fatal)
static constexpr logPrioT LOG_WARNING
A condition has occurred which may become an error, but the process continues.
static constexpr logPrioT LOG_DEBUG
Used for debugging.
#define XWC_SEM_WAIT_TS_RETVOID(ts, sec, nsec)
Add the wait time to a timespec for a sem_timedwait call, with no value returned on error.
Definition semUtils.hpp:20
#define NOT_WRITING
#define STOP_WRITING
#define WRITING
#define START_WRITING
A device base class which saves telemetry.
Definition telemeter.hpp:69
int appShutdown()
Perform telemeter application shutdown.
int loadConfig(appConfigurator &config)
Load the device section from an application configurator.
int appLogic()
Perform telemeter application logic.
int setupConfig(appConfigurator &config)
Setup an application configurator for the device section.
int appStartup()
Starts the telemetry log thread.
int checkRecordTimes(const telT &tel, telTs... tels)
Check the time of the last record for each telemetry type and make an entry if needed.
Software CRITICAL log entry.
Software ERR log entry.