9#ifndef streamWriter_hpp
10#define streamWriter_hpp
17#include <ImageStreamIO/ImageStruct.h>
18#include <ImageStreamIO/ImageStreamIO.h>
22#include <mx/sys/timeUtils.hpp>
24#include "../../libMagAOX/libMagAOX.hpp"
25#include "../../magaox_git_version.h"
27#define NOT_WRITING ( 0 )
28#define START_WRITING ( 1 )
30#define STOP_WRITING ( 3 )
381 config.add(
"writer.savePath",
389 "The absolute path where images are saved. Will use MagAO-X default if not set." );
391 config.add(
"writer.maxCircBuffLength",
393 "writer.maxCircBuffLength",
399 "The maximum length in frames of the circular buffer. Should be an integer multiple of and larger than "
400 "maxWriteChunkLength." );
402 config.add(
"writer.maxCircBuffSize",
404 "writer.maxCircBuffSize",
410 "The maximum size in MB of the circular buffer. Should be sized to hold at least 2 of the maximum "
414 "writer.maxWriteChunkLength",
416 "writer.maxWriteChunkLength",
419 "maxWriteChunkLength",
422 "The maximum length in frames of the chunks to write to disk. Should be smaller than maxCircBuffLength." );
424 config.add(
"writer.maxChunkTime",
426 "writer.maxChunkTime",
432 "The max length in seconds of the chunks to write to disk. Default is 60 sec." );
434 config.add(
"writer.stopTimeout",
436 "writer.stopTimeout",
442 "The max time in seconds to wait after a stop-writing command for the next frame before flushing the "
443 "pending data and returning to the idle state." );
445 config.add(
"writer.startWriting",
447 "writer.startWriting",
453 "Flag controlling whether writing is armed automatically at application startup. Default is false." );
455 config.add(
"writer.threadPrio",
463 "The real-time priority of the stream writer thread." );
465 config.add(
"writer.cpuset",
473 "The cpuset for the writer thread." );
475 config.add(
"writer.compress",
483 "Flag to set whether compression is used. Default true." );
485 config.add(
"writer.lz4accel",
493 "The LZ4 acceleration parameter. Larger is faster, but lower compression." );
495 config.add(
"writer.outName",
503 "The name to use for output files. Default is the shmimName." );
505 config.add(
"framegrabber.shmimName",
507 "framegrabber.shmimName",
513 "The name of the stream to monitor. From /tmp/shmimName.im.shm." );
515 config.add(
"framegrabber.semaphoreNumber",
517 "framegrabber.semaphoreNumber",
523 "The semaphore to wait on. Default is 7." );
525 config.add(
"framegrabber.semWait",
527 "framegrabber.semWait",
533 "The time in nsec to wait on the semaphore. Max is 999999999. Default is 5e8 nsec." );
535 config.add(
"framegrabber.warnMissedData",
537 "framegrabber.warnMissedData",
543 "Whether missed-data backlog summaries should be emitted at warning priority. Default is true." );
545 config.add(
"framegrabber.threadPrio",
547 "framegrabber.threadPrio",
553 "The real-time priority of the framegrabber thread." );
555 config.add(
"framegrabber.cpuset",
557 "framegrabber.cpuset",
563 "The cpuset for the framegrabber thread." );
632 std::stringstream
logss;
648 indi::addNumberElement<float>(
m_indiP_xrifStats,
"ratio", 0, 1.0, 0.0,
"%0.2f",
"Compression Ratio" );
653 std::numeric_limits<float>::max(),
656 "Differencing Rate [MB/sec]" );
661 std::numeric_limits<float>::max(),
664 "Reordering Rate [MB/sec]" );
669 std::numeric_limits<float>::max(),
672 "Compression Rate [MB/sec]" );
677 std::numeric_limits<float>::max(),
680 "Total Encoding Rate [MB/sec]" );
685 std::numeric_limits<float>::max(),
688 "Differencing Rate [f.p.s.]" );
693 std::numeric_limits<float>::max(),
696 "Reordering Rate [f.p.s.]" );
701 std::numeric_limits<float>::max(),
704 "Compression Rate [f.p.s.]" );
709 std::numeric_limits<float>::max(),
712 "Total Encoding Rate [f.p.s.]" );
729 {
__FILE__,
__LINE__,
"Write chunk length is not a divisor of circular buffer length." } );
778 double now = mx::sys::get_curr_time();
795 std::string
msg =
"stream ingest backlog: ";
951 std::cerr <<
"not compressing . . . \n";
1005 std::string
logss =
"Setting handler for SIGSEGV failed. Errno says: ";
1016 std::string
logss =
"Setting handler for SIGBUS failed. Errno says: ";
1036 static_cast<void>(
signum );
1037 static_cast<void>(
siginf );
1038 static_cast<void>(
ucont );
1046 double &circBuffSize,
1047 size_t &writeChunkLength,
1048 size_t maxCircBuffLength,
1049 double maxCircBuffSize,
1050 size_t maxWriteChunkLength,
1055 static constexpr double MB = 1048576.0;
1118 {
__FILE__,
__LINE__,
"writeChunkLength is not smaller than circBuffLength" } );
1124 {
__FILE__,
__LINE__,
"writeChunkLength is not an integer factor of circBuffLength" } );
1127 std::string
msg =
"Set circ buff length: " + std::to_string(
m_circBuffLength ) +
" frames (";
1176 std::cerr <<
"not compressing . . . \n";
1249 auto nextLog = std::chrono::steady_clock::now();
1253 const auto now = std::chrono::steady_clock::now();
1258 " sec for the writer thread to finish frame " + std::to_string(
saveStopFrameNo ),
1270 std::this_thread::sleep_for( std::chrono::milliseconds( 50 ) );
1338 mx::sys::sleep( 1 );
1356 ". Source process will need to be restarted." } );
1360 inode = buffer.st_ino;
1365 mx::sys::sleep( 1 );
1390 "No valid semaphore found for " +
m_shmimName +
". Source process will need to be restarted." } );
1407 if(
image.md[0].naxis == 3 )
1409 length =
image.md[0].size[2];
1416 <<
")" << std::endl;
1539 std::cerr <<
"new_cnt0: " <<
new_cnt0 <<
"\n";
1798 if(
image.md[0].sem <= 0 )
1836 if( buffer.st_ino !=
inode )
1839 std::cerr <<
"Restarting due to inode . . . \n";
2020 std::cerr <<
"nFrames: " <<
nFrames <<
"\n";
2026 std::cerr <<
"nothing to write\n";
2126 if(
errc != mx::error_t::noerror )
2128 std::string
msg =
"error from file::fileTimeRePath: ";
2129 msg += mx::errorMessage(
errc );
2130 msg +=
" (" + std::string( mx::errorName(
errc ) ) +
")";
2139 std::filesystem::create_directories(
fullPath );
2141 catch(
const std::filesystem::filesystem_error &
e )
2143 std::string
msg =
"filesystem_error from std::create_directories. ";
2146 msg +=
e.code().value();
2150 catch(
const std::exception &
e )
2152 std::string
msg =
"exception from std::create_directories. ";
2175 "failure writing header to file. DATA LOSS LIKELY. bytes = " + std::to_string(
bw ) } );
2181 if(
bw !=
m_xrif->compressed_size )
2187 "failure writing data to file. DATA LOSS LIKELY. bytes = " + std::to_string(
bw ) } );
2199 "failure writing timing header to file. DATA LOSS LIKELY. bytes = " + std::to_string(
bw ) } );
2211 "failure writing timing data to file. DATA LOSS LIKELY. bytes = " + std::to_string(
bw ) } );
2240(
const pcf::IndiProperty &
ipRecv )
2249 if(
ipRecv[
"toggle"].getSwitchState() == pcf::IndiElement::Off &&
2253 m_resumeAfterReconnect =
false;
2254 m_stopWriteDeadline = mx::sys::get_curr_time() + m_writeStopTimeout;
2257 if(
ipRecv[
"toggle"].getSwitchState() == pcf::IndiElement::On && m_writing ==
NOT_WRITING )
2260 m_resumeAfterReconnect =
false;
2261 m_stopWriteDeadline = 0;
2370 (
float)
m_xrif->difference_rate,
2372 (
float)
m_xrif->compress_rate } );
The base-class for XWCTk applications.
std::string basePath()
Get the.
stateCodes::stateCodeT state()
Get the current state code.
int registerIndiPropertyNew(pcf::IndiProperty &prop, int(*)(void *, const pcf::IndiProperty &))
Register an INDI property which is exposed for others to request a New Property for.
int createStandardIndiToggleSw(pcf::IndiProperty &prop, const std::string &name, const std::string &label="", const std::string &group="")
Create a standard R/W INDI switch with a single toggle element.
int m_shutdown
Flag to signal it's time to shutdown. When not 0, the main loop exits.
int shutdown()
Get the value of the shutdown flag.
indiDriver< MagAOXApp > * m_indiDriver
The INDI driver wrapper. Constructed and initialized by execute, which starts and stops communication...
static int log(const typename logT::messageT &msg, logPrioT level=logPrio::LOG_DEFAULT)
Make a log entry.
int threadStart(std::thread &thrd, bool &thrdInit, pid_t &tpid, pcf::IndiProperty &thProp, int thrdPrio, const std::string &cpuset, const std::string &thrdName, thisPtr *thrdThis, Function &&thrdStart)
Start a thread, using this class's privileges to set priority, etc.
pcf::IndiProperty m_indiP_xrifStats
double m_skipSummaryIntervalSec
Current interval between summary skip logs.
pcf::IndiProperty m_indiP_writing
std::thread m_fgThread
A separate thread for the actual framegrabbings.
int m_fgThreadPrio
Priority of the framegrabber thread, should normally be > 00.
pid_t m_fgThreadID
F.g. thread PID.
double m_writeStopTimeout
Seconds to wait after a stop-writing command before flushing the pending data without a new frame.
size_t m_circBuffLength
The length of the circular buffer, in frames.
int initialize_xrif()
Initialize the xrif system.
int recordTelem(const telem_saving_state *)
uint64_t * m_timingCircBuff
uint8_t m_dataType
The ImageStreamIO type code.
bool m_fgThreadInit
Synchronizer to ensure f.g. thread initializes before doing dangerous things.
int m_typeSize
The pixel byte depth.
bool m_resumeAfterReconnect
Tracks whether reconnect cleanup should resume writing immediately on the replacement stream.
uint64_t m_currImageTime
The write-time of the current image in nanoseconds.
xrif_t m_xrif_timing
The xrif compression handle for image data.
uint64_t m_currSaveStart
The circular buffer position at which to start saving.
std::atomic< uint64_t > m_repeatSemaphoreCount
Count of repeated semaphore wakes with unchanged frame count since the last summary log.
virtual int appStartup()
Startup functions.
size_t m_writeChunkLength
The number of frames to write at a time.
size_t m_height
The height of the image.
uint64_t m_currSaveStopFrameNo
The frame number of the image at which saving stopped (for logging)
double m_writeCompletionTimeout
Seconds to wait for the writer thread to finish a queued flush during restart cleanup.
int allocate_circbufs()
Worker function to allocate the circular buffers.
static void getCircBuffLengths(size_t &circBuffLength, double &circBuffSize, size_t &writeChunkLength, size_t maxCircBuffLength, double maxCircBuffSize, size_t maxWriteChunkLength, uint32_t width, uint32_t height, size_t typeSize)
void handlerSigSegv(int signum, siginfo_t *siginf, void *ucont)
Handles SIGSEGV and SIGBUS. Sets m_restart to true.
int m_swThreadPrio
Priority of the stream writer thread, should normally be > 0, and <= m_fgThreadPrio.
int recordSavingStats(bool force=false)
pcf::IndiProperty m_swThreadProp
The property to hold the s.w. thread details.
bool m_warnMissedData
Whether missed-data backlog summaries should be emitted as warnings instead of informational logs.
friend class streamWriter_data_test
uint64_t m_currChunkStartTime
The write-time of the first image in the chunk in nanoseconds.
std::thread m_swThread
A separate thread for the actual writing.
std::atomic< bool > m_writePending
Whether the writer thread still owns a queued save window and may touch the circular buffers.
std::string m_fgCpuset
The cpuset for the framegrabber thread. Ignored if empty (the default).
virtual int appLogic()
Implementation of the FSM for the Siglent SDG.
static void fgThreadStart(streamWriter *s)
Thread starter, called by fgThreadStart on thread construction. Calls fgThreadExec.
void release_circbufs()
Release the circular buffers owned by the framegrabber thread.
std::string m_outName
The name to use for outputting files, Default is m_shmimName.
size_t m_maxCircBuffLength
The maximum length of the circular buffer, in frames.
char * m_rawImageCircBuff
double m_stopWriteDeadline
INDI_NEWCALLBACK_DECL(streamWriter, m_indiP_writing)
std::atomic< uint64_t > m_skippedFrameCount
Count of skipped frames accumulated by the framegrabber thread since the last summary log.
char * m_xrif_timing_header
Storage for the xrif image data file header.
double m_nextSkipSummaryTime
Time after which the next summary skip log may be emitted.
~streamWriter() noexcept
Destructor.
int allocate_xrif()
Worker function to configure and allocate the xrif handles.
int setSigSegvHandler()
Sets the handler for SIGSEGV and SIGBUS.
std::string m_swCpuset
The cpuset for the framegrabber thread. Ignored if empty (the default).
bool m_startWriting
Whether writing should be armed automatically at application startup.
xrif_t m_xrif
The xrif compression handle for image data.
streamWriter()
Default c'tor.
virtual void setupConfig()
Setup the configuration system (called by MagAOXApp::setup())
int m_semaphoreNumber
The image structure semaphore index.
std::string m_rawimageDir
The path where files will be saved.
static void _handlerSigSegv(int signum, siginfo_t *siginf, void *ucont)
int recordSavingState(bool force=false)
uint64_t m_currChunkStart
The circular buffer starting position of the current to-be-written chunk.
pid_t m_swThreadID
S.w. thread pid.
bool waitForWriteCompletion(uint64_t saveStopFrameNo)
Wait for the writer thread to finish any queued save work before buffer teardown.
virtual void loadConfig()
load the configuration system results (called by MagAOXApp::setup())
pcf::IndiProperty m_fgThreadProp
The property to hold the f.g. thread details.
void swThreadExec()
Execute the stream writer main loop.
uint64_t m_nextChunkStart
The circular buffer starting position of the next to-be-written chunk.
static streamWriter * m_selfWriter
char * m_xrif_header
Storage for the xrif image data file header.
friend class streamWriter_test
double m_maxChunkTime
The maximum time before writing regardless of number of frames.
double m_maxCircBuffSize
The maximum size of the circular bufffer in MB.
int doEncode()
Function called when semaphore is raised to do the encode and write.
static void swThreadStart(streamWriter *s)
Thread starter, called by swThreadStart on thread construction. Calls swThreadExec.
uint64_t m_currSaveStop
The circular buffer position at which to stop saving.
void fgThreadExec()
Execute the frame grabber main loop.
std::string m_outFilePath
The full path for the latest output file.
size_t m_maxWriteChunkLength
std::string m_shmimName
The name of the shared memory buffer.
dev::telemeter< streamWriter > telemeterT
size_t m_width
The width of the image.
double m_circBuffSize
The size of the circular buffer, in MB.
virtual int appShutdown()
Do any needed shutdown tasks. Currently nothing in this app.
sem_t m_swSemaphore
Semaphore used to synchronize the fg thread and the sw thread.
bool m_swThreadInit
Synchronizer to ensure s.w. thread initializes before doing dangerous things.
#define MAGAOX_rawimageRelPath
The relative path to the raw images directory.
#define MAGAOX_env_rawimage
Environment variable setting the relative raw image path.
#define INDI_NEWCALLBACK_DEFN(class, prop)
Define the callback for a new property request.
#define REG_INDI_NEWPROP_NOCB(prop, propName, type)
Register a NEW INDI property with the class, with no callback.
#define INDI_NEWCALLBACK(prop)
Get the name of the static callback wrapper for a new property.
#define INDI_VALIDATE_CALLBACK_PROPS(prop1, prop2)
Standard check for matching INDI properties in a callback.
void updateIfChanged(pcf::IndiProperty &p, const std::string &el, const T &newVal, indiDriverT *indiDriver, pcf::IndiProperty::PropertyStateType newState=pcf::IndiProperty::Ok)
Update the value of the INDI element, but only if it has changed.
void updateSwitchIfChanged(pcf::IndiProperty &p, const std::string &el, const pcf::IndiElement::SwitchStateType &newVal, indiDriverT *indiDriver, pcf::IndiProperty::PropertyStateType newState=pcf::IndiProperty::Ok)
Update the value of the INDI element, but only if it has changed.
const pcf::IndiProperty & ipRecv
mx::error_t fileTimeRelPath(std::string &tstamp, std::string &relPath, time_t ts_sec, long ts_nsec)
Get the timestamp and the relative path based on a time.
static constexpr logPrioT LOG_NOTICE
A normal but significant condition.
static constexpr logPrioT LOG_INFO
Informational. The info log level is the lowest level recorded during normal operations.
static constexpr logPrioT LOG_CRITICAL
The process can not continue and will shut down (fatal)
static constexpr logPrioT LOG_WARNING
A condition has occurred which may become an error, but the process continues.
static constexpr logPrioT LOG_DEBUG
Used for debugging.
#define XWC_SEM_WAIT_TS_RETVOID(ts, sec, nsec)
Add the wait time to a timespec for a sem_timedwait call, with no value returned on error.
#define xrif_allocate_reordered
#define xrif_write_header
#define xrif_set_lz4_acceleration
#define xrif_allocate_raw
A device base class which saves telemetry.
int appShutdown()
Perform telemeter application shutdown.
int loadConfig(appConfigurator &config)
Load the device section from an application configurator.
int appLogic()
Perform telemeter application logic.
int setupConfig(appConfigurator &config)
Setup an application configurator for the device section.
int appStartup()
Starts the telemetry log thread.
int checkRecordTimes(const telT &tel, telTs... tels)
Check the time of the last record for each telemetry type and make an entry if needed.
@ OPERATING
The device is operating, other than homing.
@ READY
The device is ready for operation, but is not operating.
Software CRITICAL log entry.