9#ifndef streamWriter_hpp
10#define streamWriter_hpp
12#include <ImageStreamIO/ImageStruct.h>
13#include <ImageStreamIO/ImageStreamIO.h>
17#include <mx/sys/timeUtils.hpp>
19#include "../../libMagAOX/libMagAOX.hpp"
20#include "../../magaox_git_version.h"
22#define NOT_WRITING (0)
23#define START_WRITING (1)
25#define STOP_WRITING (3)
318 config.add(
"writer.savePath",
"",
"writer.savePath", argType::Required,
"writer",
"savePath",
false,
"string",
"The absolute path where images are saved. Will use MagAO-X default if not set.");
320 config.add(
"writer.circBuffLength",
"",
"writer.circBuffLength", argType::Required,
"writer",
"circBuffLength",
false,
"size_t",
"The length in frames of the circular buffer. Should be an integer multiple of and larger than writeChunkLength.");
322 config.add(
"writer.writeChunkLength",
"",
"writer.writeChunkLength", argType::Required,
"writer",
"writeChunkLength",
false,
"size_t",
"The length in frames of the chunks to write to disk. Should be smaller than circBuffLength.");
324 config.add(
"writer.maxChunkTime",
"",
"writer.maxChunkTime", argType::Required,
"writer",
"maxChunkTime",
false,
"float",
"The max length in seconds of the chunks to write to disk. Default is 60 sec.");
326 config.add(
"writer.threadPrio",
"",
"writer.threadPrio", argType::Required,
"writer",
"threadPrio",
false,
"int",
"The real-time priority of the stream writer thread.");
328 config.add(
"writer.cpuset",
"",
"writer.cpuset", argType::Required,
"writer",
"cpuset",
false,
"int",
"The cpuset for the writer thread.");
330 config.add(
"writer.compress",
"",
"writer.compress", argType::Required,
"writer",
"compress",
false,
"bool",
"Flag to set whether compression is used. Default true.");
332 config.add(
"writer.lz4accel",
"",
"writer.lz4accel", argType::Required,
"writer",
"lz4accel",
false,
"int",
"The LZ4 acceleration parameter. Larger is faster, but lower compression.");
334 config.add(
"writer.outName",
"",
"writer.outName", argType::Required,
"writer",
"outName",
false,
"int",
"The name to use for output files. Default is the shmimName.");
336 config.add(
"framegrabber.shmimName",
"",
"framegrabber.shmimName", argType::Required,
"framegrabber",
"shmimName",
false,
"int",
"The name of the stream to monitor. From /tmp/shmimName.im.shm.");
338 config.add(
"framegrabber.semaphoreNumber",
"",
"framegrabber.semaphoreNumber", argType::Required,
"framegrabber",
"semaphoreNumber",
false,
"int",
"The semaphore to wait on. Default is 7.");
340 config.add(
"framegrabber.semWait",
"",
"framegrabber.semWait", argType::Required,
"framegrabber",
"semWait",
false,
"int",
"The time in nsec to wait on the semaphore. Max is 999999999. Default is 5e8 nsec.");
342 config.add(
"framegrabber.threadPrio",
"",
"framegrabber.threadPrio", argType::Required,
"framegrabber",
"threadPrio",
false,
"int",
"The real-time priority of the framegrabber thread.");
344 config.add(
"framegrabber.cpuset",
"",
"framegrabber.cpuset", argType::Required,
"framegrabber",
"cpuset",
false,
"string",
"The cpuset for the framegrabber thread.");
395 std::stringstream
logss;
411 indi::addNumberElement<float>(
m_indiP_xrifStats,
"ratio", 0, 1.0, 0.0,
"%0.2f",
"Compression Ratio");
413 indi::addNumberElement<float>(
m_indiP_xrifStats,
"differenceMBsec", 0, std::numeric_limits<float>::max(), 0.0,
"%0.2f",
"Differencing Rate [MB/sec]");
415 indi::addNumberElement<float>(
m_indiP_xrifStats,
"reorderMBsec", 0, std::numeric_limits<float>::max(), 0.0,
"%0.2f",
"Reordering Rate [MB/sec]");
417 indi::addNumberElement<float>(
m_indiP_xrifStats,
"compressMBsec", 0, std::numeric_limits<float>::max(), 0.0,
"%0.2f",
"Compression Rate [MB/sec]");
419 indi::addNumberElement<float>(
m_indiP_xrifStats,
"encodeMBsec", 0, std::numeric_limits<float>::max(), 0.0,
"%0.2f",
"Total Encoding Rate [MB/sec]");
421 indi::addNumberElement<float>(
m_indiP_xrifStats,
"differenceFPS", 0, std::numeric_limits<float>::max(), 0.0,
"%0.2f",
"Differencing Rate [f.p.s.]");
423 indi::addNumberElement<float>(
m_indiP_xrifStats,
"reorderFPS", 0, std::numeric_limits<float>::max(), 0.0,
"%0.2f",
"Reordering Rate [f.p.s.]");
425 indi::addNumberElement<float>(
m_indiP_xrifStats,
"compressFPS", 0, std::numeric_limits<float>::max(), 0.0,
"%0.2f",
"Compression Rate [f.p.s.]");
427 indi::addNumberElement<float>(
m_indiP_xrifStats,
"encodeFPS", 0, std::numeric_limits<float>::max(), 0.0,
"%0.2f",
"Total Encoding Rate [f.p.s.]");
582 std::cerr <<
"not compressing . . . \n";
633 std::string
logss =
"Setting handler for SIGSEGV failed. Errno says: ";
644 std::string
logss =
"Setting handler for SIGBUS failed. Errno says: ";
668 static_cast<void>(
signum);
669 static_cast<void>(
siginf);
670 static_cast<void>(
ucont);
722 std::cerr <<
"not compressing . . . \n";
893 if (
image.md[0].naxis == 3)
895 length =
image.md[0].size[2];
901 std::cerr <<
"connected"
921 if (
image.md[0].naxis > 2)
954 if (
image.md[0].naxis > 2)
966 if (
image.md[0].naxis == 3)
996 std::cerr <<
"new_cnt0: " <<
new_cnt0 <<
"\n";
1233 if (
image.md[0].sem <= 0)
1274 std::cerr <<
"Restarting due to inode . . . \n";
1474 std::cerr <<
"nFrames: " <<
nFrames <<
"\n";
1560 if (
rv !=
sizeof(
"YYYYMMDDHHMMSSNNNNNNNNN") - 1)
1627(
const pcf::IndiProperty &
ipRecv)
1641 if (
ipRecv[
"toggle"].getSwitchState() == pcf::IndiElement::On && m_writing ==
NOT_WRITING)
The base-class for MagAO-X applications.
stateCodes::stateCodeT state()
Get the current state code.
int registerIndiPropertyNew(pcf::IndiProperty &prop, int(*)(void *, const pcf::IndiProperty &))
Register an INDI property which is exposed for others to request a New Property for.
int createStandardIndiToggleSw(pcf::IndiProperty &prop, const std::string &name, const std::string &label="", const std::string &group="")
Create a standard R/W INDI switch with a single toggle element.
int m_shutdown
Flag to signal it's time to shutdown. When not 0, the main loop exits.
int shutdown()
Get the value of the shutdown flag.
indiDriver< MagAOXApp > * m_indiDriver
The INDI driver wrapper. Constructed and initialized by execute, which starts and stops communication...
static int log(const typename logT::messageT &msg, logPrioT level=logPrio::LOG_DEFAULT)
Make a log entry.
int threadStart(std::thread &thrd, bool &thrdInit, pid_t &tpid, pcf::IndiProperty &thProp, int thrdPrio, const std::string &cpuset, const std::string &thrdName, thisPtr *thrdThis, Function &&thrdStart)
Start a thread, using this class's privileges to set priority, etc.
std::string MagAOXPath
The base path of the MagAO-X system.
pcf::IndiProperty m_indiP_xrifStats
pcf::IndiProperty m_indiP_writing
double m_currImageTime
The write-time of the current image.
std::thread m_fgThread
A separate thread for the actual framegrabbings.
int m_fgThreadPrio
Priority of the framegrabber thread, should normally be > 00.
pid_t m_fgThreadID
F.g. thread PID.
size_t m_circBuffLength
The length of the circular buffer, in frames.
int initialize_xrif()
Initialize the xrif system.
int recordTelem(const telem_saving_state *)
uint64_t * m_timingCircBuff
uint8_t m_dataType
The ImageStreamIO type code.
unsigned m_semWaitSec
The time in whole sec to wait on the semaphore, to which m_semWaitNSec is added. Default is 0 nsec.
bool m_fgThreadInit
Synchronizer to ensure f.g. thread initializes before doing dangerous things.
int m_typeSize
The pixel byte depth.
xrif_t m_xrif_timing
The xrif compression handle for image data.
uint64_t m_currSaveStart
The circular buffer position at which to start saving.
virtual int appStartup()
Startup functions.
size_t m_writeChunkLength
The number of frames to write at a time.
size_t m_height
The height of the image.
uint64_t m_currSaveStopFrameNo
The frame number of the image at which saving stopped (for logging)
double m_currChunkStartTime
The write-time of the first image in the chunk.
int allocate_circbufs()
Worker function to allocate the circular buffers.
void handlerSigSegv(int signum, siginfo_t *siginf, void *ucont)
Handles SIGSEGV and SIGBUS. Sets m_restart to true.
int m_swThreadPrio
Priority of the stream writer thread, should normally be > 0, and <= m_fgThreadPrio.
int recordSavingStats(bool force=false)
pcf::IndiProperty m_swThreadProp
The property to hold the s.w. thread details.
std::thread m_swThread
A separate thread for the actual writing.
std::string m_fgCpuset
The cpuset for the framegrabber thread. Ignored if empty (the default).
virtual int appLogic()
Implementation of the FSM for the Siglent SDG.
static void fgThreadStart(streamWriter *s)
Thread starter, called by fgThreadStart on thread construction. Calls fgThreadExec.
std::string m_outName
The name to use for outputting files, Default is m_shmimName.
char * m_rawImageCircBuff
INDI_NEWCALLBACK_DECL(streamWriter, m_indiP_writing)
char * m_xrif_timing_header
Storage for the xrif image data file header.
~streamWriter() noexcept
Destructor.
unsigned m_semWaitNSec
The time in nsec to wait on the semaphore, added to m_semWaitSec. Max is 999999999....
int allocate_xrif()
Worker function to configure and allocate the xrif handles.
int setSigSegvHandler()
Sets the handler for SIGSEGV and SIGBUS.
std::string m_swCpuset
The cpuset for the framegrabber thread. Ignored if empty (the default).
xrif_t m_xrif
The xrif compression handle for image data.
streamWriter()
Default c'tor.
virtual void setupConfig()
Setup the configuration system (called by MagAOXApp::setup())
int m_semaphoreNumber
The image structure semaphore index.
std::string m_rawimageDir
The path where files will be saved.
static void _handlerSigSegv(int signum, siginfo_t *siginf, void *ucont)
The handler called when SIGSEGV or SIGBUS is received, which will be due to ImageStreamIO server rese...
int recordSavingState(bool force=false)
int m_writing
Controls whether or not images are being written, and sequences start and stop of writing.
uint64_t m_currChunkStart
The circular buffer starting position of the current to-be-written chunk.
pid_t m_swThreadID
S.w. thread pid.
virtual void loadConfig()
load the configuration system results (called by MagAOXApp::setup())
pcf::IndiProperty m_fgThreadProp
The property to hold the f.g. thread details.
void swThreadExec()
Execute the stream writer main loop.
uint64_t m_nextChunkStart
The circular buffer starting position of the next to-be-written chunk.
static streamWriter * m_selfWriter
Static pointer to this (set in constructor). Used for getting out of the static SIGSEGV handler.
char * m_xrif_header
Storage for the xrif image data file header.
double m_maxChunkTime
The maximum time before writing regardless of number of frames.
int doEncode()
Function called when semaphore is raised to do the encode and write.
static void swThreadStart(streamWriter *s)
Thread starter, called by swThreadStart on thread construction. Calls swThreadExec.
uint64_t m_currSaveStop
The circular buffer position at which to stop saving.
void fgThreadExec()
Execute the frame grabber main loop.
std::string m_shmimName
The name of the shared memory buffer.
dev::telemeter< streamWriter > telemeterT
size_t m_width
The width of the image.
virtual int appShutdown()
Do any needed shutdown tasks. Currently nothing in this app.
sem_t m_swSemaphore
Semaphore used to synchronize the fg thread and the sw thread.
bool m_swThreadInit
Synchronizer to ensure s.w. thread initializes before doing dangerous things.
#define MAGAOX_rawimageRelPath
The relative path to the raw images directory.
#define INDI_NEWCALLBACK_DEFN(class, prop)
Define the callback for a new property request.
#define REG_INDI_NEWPROP_NOCB(prop, propName, type)
Register a NEW INDI property with the class, with no callback.
#define INDI_NEWCALLBACK(prop)
Get the name of the static callback wrapper for a new property.
@ OPERATING
The device is operating, other than homing.
@ READY
The device is ready for operation, but is not operating.
#define INDI_VALIDATE_CALLBACK_PROPS(prop1, prop2)
Standard check for matching INDI properties in a callback.
void updateIfChanged(pcf::IndiProperty &p, const std::string &el, const T &newVal, indiDriverT *indiDriver, pcf::IndiProperty::PropertyStateType newState=pcf::IndiProperty::Ok)
Update the value of the INDI element, but only if it has changed.
void updateSwitchIfChanged(pcf::IndiProperty &p, const std::string &el, const pcf::IndiElement::SwitchStateType &newVal, indiDriverT *indiDriver, pcf::IndiProperty::PropertyStateType newState=pcf::IndiProperty::Ok)
Update the value of the INDI element, but only if it has changed.
const pcf::IndiProperty & ipRecv
static constexpr logPrioT LOG_NOTICE
A normal but significant condition.
static constexpr logPrioT LOG_CRITICAL
The process can not continue and will shut down (fatal)
static constexpr logPrioT LOG_WARNING
A condition has occurred which may become an error, but the process continues.
static constexpr logPrioT LOG_DEBUG
Used for debugging.
#define XWC_SEM_WAIT_TS_RETVOID(ts, sec, nsec)
Add the wait time to a timespec for a sem_timedwait call, with no value returned on error.
A device base class which saves telemetry.
int appShutdown()
Perform telemeter application shutdown.
int loadConfig(appConfigurator &config)
Load the device section from an application configurator.
int appLogic()
Perform telemeter application logic.
int setupConfig(appConfigurator &config)
Setup an application configurator for the device section.
int appStartup()
Starts the telemetry log thread.
int checkRecordTimes(const telT &tel, telTs... tels)
Check the time of the last record for each telemetry type and make an entry if needed.
Software CRITICAL log entry.