842TEST_CASE(
"streamWriter fgThreadExec ingests stream data and manages write scheduling",
"[streamWriter]" )
845 #ifdef STREAMWRITER_TEST_DOXYGEN_REF
852 SECTION(
"cube streams populate frame arrays, skipped-frame counters, and missing timestamps" )
854 streamWriterLifecycleTest app;
855 fgHarnessScope fgScope( app );
856 streamWriterConfig cfg;
858 cfg.m_shmimName = uniqueShmimName(
"cube_ingest" );
859 cfg.m_maxCircBuffLength = 8;
860 cfg.m_maxWriteChunkLength = 4;
861 cfg.m_maxChunkTime = 0.25;
862 cfg.m_semWaitNSec = 1000000;
863 cfg.m_savePath = ( std::filesystem::path(
"/tmp/streamWriter_lifecycle_test" ) /
"fg_cube" /
"raw" ).
string();
865 tempStream source( cfg.m_shmimName, 2, 2, 8 );
867 loadConfig( app,
"fg_cube_ingest", cfg );
868 REQUIRE( app.initializeFgHarness() == 0 );
869 fgScope.markActive(
true );
871 app.startFgHarnessThread();
873 REQUIRE( waitFor( [&app]() {
return app.m_rawImageCircBuff !=
nullptr && app.m_timingCircBuff !=
nullptr; } ) );
874 REQUIRE( app.m_width == 2 );
875 REQUIRE( app.m_height == 2 );
876 REQUIRE( app.m_dataType == _DATATYPE_UINT16 );
877 REQUIRE( app.m_typeSize ==
sizeof( uint16_t ) );
878 REQUIRE( app.m_circBuffLength == 8 );
879 REQUIRE( app.m_writeChunkLength == 4 );
881 const timespec atime0{ 111, 222 };
882 const timespec wtime0{ 333, 444 };
883 source.publishFrame( 0, 1, 100, atime0, wtime0 );
885 REQUIRE( waitFor( [&app]() {
return app.m_currImage == 1; } ) );
886 REQUIRE( rawFrameWord( app, 0, 0 ) == 100 );
887 REQUIRE( rawFrameWord( app, 0, 3 ) == 103 );
888 REQUIRE( timingWord( app, 0, 0 ) == 1 );
889 REQUIRE( timingWord( app, 0, 1 ) ==
static_cast<uint64_t
>( atime0.tv_sec ) );
890 REQUIRE( timingWord( app, 0, 2 ) ==
static_cast<uint64_t
>( atime0.tv_nsec ) );
891 REQUIRE( timingWord( app, 0, 3 ) ==
static_cast<uint64_t
>( wtime0.tv_sec ) );
892 REQUIRE( timingWord( app, 0, 4 ) ==
static_cast<uint64_t
>( wtime0.tv_nsec ) );
894 source.publishFrame( 0, 1, 200, atime0, wtime0 );
896 REQUIRE( waitFor( [&app]() {
return app.m_repeatSemaphoreCount.load() == 1; } ) );
897 REQUIRE( app.m_currImage == 1 );
898 REQUIRE( rawFrameWord( app, 0, 0 ) == 100 );
900 const timespec missingTime{ 0, 0 };
901 source.publishFrame( 1, 3, 300, missingTime, missingTime );
903 REQUIRE( waitFor( [&app]() {
return app.m_currImage == 2; } ) );
904 REQUIRE( app.m_skippedFrameCount.load() == 1 );
905 REQUIRE( rawFrameWord( app, 1, 0 ) == 300 );
906 REQUIRE( rawFrameWord( app, 1, 3 ) == 303 );
907 REQUIRE( timingWord( app, 1, 0 ) == 3 );
908 REQUIRE( timingWord( app, 1, 1 ) != 0 );
909 REQUIRE( timingWord( app, 1, 3 ) == timingWord( app, 1, 1 ) );
910 REQUIRE( timingWord( app, 1, 4 ) == timingWord( app, 1, 2 ) );
911 REQUIRE( app.m_currImageTime != 0 );
917 SECTION(
"cube streams post save work on chunk boundaries, timeout flushes, and stop requests" )
919 streamWriterLifecycleTest app;
920 fgHarnessScope fgScope( app );
921 streamWriterConfig cfg;
923 cfg.m_shmimName = uniqueShmimName(
"cube_writing" );
924 cfg.m_maxCircBuffLength = 8;
925 cfg.m_maxWriteChunkLength = 2;
926 cfg.m_maxChunkTime = 0.2;
927 cfg.m_writeStopTimeout = 0.05;
928 cfg.m_semWaitNSec = 1000000;
930 ( std::filesystem::path(
"/tmp/streamWriter_lifecycle_test" ) /
"fg_writing" /
"raw" ).
string();
932 tempStream source( cfg.m_shmimName, 2, 2, 8 );
934 loadConfig( app,
"fg_cube_writing", cfg );
935 REQUIRE( app.initializeFgHarness() == 0 );
936 fgScope.markActive(
true );
938 app.startFgHarnessThread();
940 REQUIRE( waitFor( [&app]() {
return app.m_rawImageCircBuff !=
nullptr; } ) );
942 const timespec baseAtime = currentRealtime();
943 const timespec baseWtime = offsetTimespec( baseAtime, 1000 );
946 source.publishFrame( 0, 1, 10, baseAtime, baseWtime );
947 REQUIRE( waitFor( [&app]() {
return app.m_currImage == 1; } ) );
948 REQUIRE( app.m_writing ==
WRITING );
950 source.publishFrame( 1, 2, 20, offsetTimespec( baseAtime, 1000000 ), offsetTimespec( baseWtime, 1000000 ) );
951 REQUIRE( waitFor( [&app]() {
return app.writerSemaphoreValue() > 0; } ) );
952 REQUIRE( app.m_currSaveStart == 0 );
953 REQUIRE( app.m_currSaveStop == 2 );
954 REQUIRE( app.m_currSaveStopFrameNo == 2 );
955 REQUIRE( app.drainWriterSemaphore() >= 1 );
957 const timespec stopAtime = currentRealtime();
958 const timespec stopWtime = offsetTimespec( stopAtime, 1000 );
960 source.publishFrame( 2, 3, 30, stopAtime, stopWtime );
961 REQUIRE( waitFor( [&app]() {
return app.m_currImage == 3; } ) );
962 REQUIRE( app.m_writing ==
WRITING );
965 app.m_stopWriteDeadline = mx::sys::get_curr_time() + cfg.m_writeStopTimeout;
966 source.publishFrame( 3, 4, 40, offsetTimespec( stopAtime, 1000000 ), offsetTimespec( stopWtime, 1000000 ) );
967 REQUIRE( waitFor( [&app]() {
return app.writerSemaphoreValue() > 0; } ) );
968 REQUIRE( app.m_currSaveStart == 2 );
969 REQUIRE( app.m_currSaveStop == 4 );
970 REQUIRE( app.m_currSaveStopFrameNo == 4 );
971 REQUIRE( app.drainWriterSemaphore() >= 1 );
973 const timespec timeoutStopAtime = currentRealtime();
974 const timespec timeoutStopWtime = offsetTimespec( timeoutStopAtime, 1000 );
976 source.publishFrame( 4, 5, 50, timeoutStopAtime, timeoutStopWtime );
977 REQUIRE( waitFor( [&app]() {
return app.m_currImage == 5; } ) );
978 REQUIRE( app.m_writing ==
WRITING );
981 app.m_stopWriteDeadline = mx::sys::get_curr_time() + cfg.m_writeStopTimeout;
982 std::this_thread::sleep_for( std::chrono::milliseconds( 10 ) );
983 REQUIRE( app.writerSemaphoreValue() == 0 );
984 REQUIRE( waitFor( [&app]() {
return app.writerSemaphoreValue() > 0; } ) );
985 REQUIRE( app.m_currSaveStart == 4 );
986 REQUIRE( app.m_currSaveStop == 5 );
987 REQUIRE( app.m_currSaveStopFrameNo == 5 );
988 REQUIRE( app.drainWriterSemaphore() >= 1 );
995 SECTION(
"cube streams flush partial chunks on timeout without dropping the queued frame" )
997 streamWriterLifecycleTest app;
998 fgHarnessScope fgScope( app );
999 streamWriterConfig cfg;
1001 cfg.m_shmimName = uniqueShmimName(
"cube_timeout" );
1002 cfg.m_maxCircBuffLength = 8;
1003 cfg.m_maxWriteChunkLength = 4;
1004 cfg.m_maxChunkTime = 0.2;
1005 cfg.m_semWaitNSec = 1000000;
1007 ( std::filesystem::path(
"/tmp/streamWriter_lifecycle_test" ) /
"fg_timeout" /
"raw" ).
string();
1009 tempStream source( cfg.m_shmimName, 2, 2, 8 );
1011 loadConfig( app,
"fg_cube_timeout", cfg );
1012 REQUIRE( app.initializeFgHarness() == 0 );
1013 fgScope.markActive(
true );
1015 app.startFgHarnessThread();
1017 REQUIRE( waitFor( [&app]() {
return app.m_rawImageCircBuff !=
nullptr; } ) );
1019 const timespec timeoutAtime = currentRealtime();
1020 const timespec timeoutWtime = offsetTimespec( timeoutAtime, 1000 );
1023 source.publishFrame( 0, 1, 10, timeoutAtime, timeoutWtime );
1024 REQUIRE( waitFor( [&app]() {
return app.m_currImage == 1; } ) );
1025 REQUIRE( app.m_writing ==
WRITING );
1027 REQUIRE( waitFor( [&app]() {
return app.writerSemaphoreValue() == 1 && app.m_writing ==
START_WRITING; } ) );
1028 REQUIRE( app.m_currSaveStart == 0 );
1029 REQUIRE( app.m_currSaveStop == 1 );
1030 REQUIRE( app.m_currSaveStopFrameNo == 1 );
1031 REQUIRE( rawFrameWord( app, 0, 0 ) == 10 );
1032 REQUIRE( rawFrameWord( app, 0, 3 ) == 13 );
1033 REQUIRE( app.drainWriterSemaphore() == 1 );
1036 app.stopFgHarness();
1040 SECTION(
"replaced shmims flush the pending chunk and reconnect ready to keep writing" )
1042 streamWriterLifecycleTest app;
1043 fgHarnessScope fgScope( app );
1044 streamWriterConfig cfg;
1045 std::unique_ptr<tempStream> source;
1047 cfg.m_shmimName = uniqueShmimName(
"cube_restart" );
1048 cfg.m_maxCircBuffLength = 8;
1049 cfg.m_maxWriteChunkLength = 4;
1050 cfg.m_maxChunkTime = 10.0;
1051 cfg.m_semWaitNSec = 1000000;
1053 ( std::filesystem::path(
"/tmp/streamWriter_lifecycle_test" ) /
"fg_restart" /
"raw" ).
string();
1055 source = std::make_unique<tempStream>( cfg.m_shmimName, 2, 2, 8 );
1057 loadConfig( app,
"fg_cube_restart", cfg );
1058 REQUIRE( app.initializeFgHarness() == 0 );
1059 fgScope.markActive(
true );
1061 app.startFgHarnessThread();
1063 REQUIRE( waitFor( [&app]()
1064 {
return app.m_rawImageCircBuff !=
nullptr && app.m_width == 2 && app.m_height == 2; } ) );
1066 const timespec baseAtime = currentRealtime();
1067 const timespec baseWtime = offsetTimespec( baseAtime, 1000 );
1068 const timespec nextAtime = offsetTimespec( baseAtime, 1000000 );
1069 const timespec nextWtime = offsetTimespec( baseWtime, 1000000 );
1072 source->publishFrame( 0, 1, 100, baseAtime, baseWtime );
1073 REQUIRE( waitFor( [&app]() {
return app.m_currImage == 1; } ) );
1074 REQUIRE( app.m_writing ==
WRITING );
1076 source->publishFrame( 1, 2, 200, nextAtime, nextWtime );
1077 REQUIRE( waitFor( [&app]() {
return app.m_currImage == 2; } ) );
1078 REQUIRE( app.writerSemaphoreValue() == 0 );
1081 source = std::make_unique<tempStream>( cfg.m_shmimName, 3, 1, 8 );
1084 waitFor( [&app]() {
return app.writerSemaphoreValue() > 0 && app.m_writing ==
STOP_WRITING; }, 3000 ) );
1085 REQUIRE( app.m_currSaveStart == 0 );
1086 REQUIRE( app.m_currSaveStop == 2 );
1087 REQUIRE( app.m_currSaveStopFrameNo == 2 );
1089 const std::vector<uint16_t> pendingRaw = copyPendingRawFrames( app );
1090 const std::vector<uint64_t> pendingTiming = copyPendingTimingFrames( app );
1092 REQUIRE( pendingRaw == std::vector<uint16_t>{ 100, 101, 102, 103, 200, 201, 202, 203 } );
1093 REQUIRE( pendingTiming == std::vector<uint64_t>{ 1,
1094 static_cast<uint64_t
>( baseAtime.tv_sec ),
1095 static_cast<uint64_t
>( baseAtime.tv_nsec ),
1096 static_cast<uint64_t
>( baseWtime.tv_sec ),
1097 static_cast<uint64_t
>( baseWtime.tv_nsec ),
1099 static_cast<uint64_t
>( nextAtime.tv_sec ),
1100 static_cast<uint64_t
>( nextAtime.tv_nsec ),
1101 static_cast<uint64_t
>( nextWtime.tv_sec ),
1102 static_cast<uint64_t
>( nextWtime.tv_nsec ) } );
1105 app.m_resumeAfterReconnect =
true;
1106 app.m_writePending =
false;
1107 REQUIRE( app.drainWriterSemaphore() >= 1 );
1109 REQUIRE( waitFor( [&app]() {
return app.m_width == 3 && app.m_height == 1 && app.m_writing ==
START_WRITING; },
1111 REQUIRE( app.writerSemaphoreValue() == 0 );
1113 const timespec reconnectedAtime = currentRealtime();
1114 const timespec reconnectedWtime = offsetTimespec( reconnectedAtime, 1000 );
1115 source->publishFrame( 0, 101, 500, reconnectedAtime, reconnectedWtime );
1117 REQUIRE( waitFor( [&app]() {
return app.m_currImage == 1; } ) );
1118 REQUIRE( app.m_writing ==
WRITING );
1119 REQUIRE( rawFrameWord( app, 0, 0 ) == 500 );
1120 REQUIRE( rawFrameWord( app, 0, 2 ) == 502 );
1121 REQUIRE( timingWord( app, 0, 0 ) == 101 );
1122 REQUIRE( app.writerSemaphoreValue() == 0 );
1124 app.stopFgHarness();
1128 SECTION(
"restart cleanup times out instead of hanging when no writer thread drains the queued flush" )
1130 streamWriterLifecycleTest app;
1131 fgHarnessScope fgScope( app );
1132 streamWriterConfig cfg;
1133 std::unique_ptr<tempStream> source;
1135 cfg.m_shmimName = uniqueShmimName(
"cube_restart_timeout" );
1136 cfg.m_maxCircBuffLength = 8;
1137 cfg.m_maxWriteChunkLength = 4;
1138 cfg.m_maxChunkTime = 10.0;
1139 cfg.m_writeStopTimeout = 0.1;
1140 cfg.m_semWaitNSec = 1000000;
1142 ( std::filesystem::path(
"/tmp/streamWriter_lifecycle_test" ) /
"fg_restart_timeout" /
"raw" ).
string();
1144 source = std::make_unique<tempStream>( cfg.m_shmimName, 2, 2, 8 );
1146 loadConfig( app,
"fg_cube_restart_timeout", cfg );
1147 REQUIRE( app.initializeFgHarness() == 0 );
1148 fgScope.markActive(
true );
1149 app.m_writeCompletionTimeout = 0.1;
1151 app.startFgHarnessThread();
1153 REQUIRE( waitFor( [&app]()
1154 {
return app.m_rawImageCircBuff !=
nullptr && app.m_width == 2 && app.m_height == 2; } ) );
1156 const timespec baseAtime = currentRealtime();
1157 const timespec baseWtime = offsetTimespec( baseAtime, 1000 );
1160 source->publishFrame( 0, 1, 100, baseAtime, baseWtime );
1161 REQUIRE( waitFor( [&app]() {
return app.m_currImage == 1; } ) );
1162 source->publishFrame( 1, 2, 200, offsetTimespec( baseAtime, 1000000 ), offsetTimespec( baseWtime, 1000000 ) );
1163 REQUIRE( waitFor( [&app]() {
return app.m_currImage == 2; } ) );
1164 REQUIRE( app.m_writing ==
WRITING );
1165 REQUIRE( app.writerSemaphoreValue() == 0 );
1168 source = std::make_unique<tempStream>( cfg.m_shmimName, 3, 1, 8 );
1170 REQUIRE( waitFor( [&app]() {
return app.m_shutdown != 0; }, 3000 ) );
1172 REQUIRE( app.m_writePending ==
true );
1173 REQUIRE( app.writerSemaphoreValue() > 0 );
1175 app.stopFgHarness();
1179 SECTION(
"two-dimensional streams ingest metadata from the shared image header" )
1181 streamWriterLifecycleTest app;
1182 fgHarnessScope fgScope( app );
1183 streamWriterConfig cfg;
1185 cfg.m_shmimName = uniqueShmimName(
"image2d" );
1186 cfg.m_maxCircBuffLength = 8;
1187 cfg.m_maxWriteChunkLength = 4;
1188 cfg.m_maxChunkTime = 0.25;
1189 cfg.m_semWaitNSec = 1000000;
1190 cfg.m_savePath = ( std::filesystem::path(
"/tmp/streamWriter_lifecycle_test" ) /
"fg_2d" /
"raw" ).
string();
1192 tempStream source( cfg.m_shmimName, 3, 2, 1 );
1194 loadConfig( app,
"fg_2d_stream", cfg );
1195 REQUIRE( app.initializeFgHarness() == 0 );
1196 fgScope.markActive(
true );
1198 app.startFgHarnessThread();
1200 REQUIRE( waitFor( [&app]() {
return app.m_rawImageCircBuff !=
nullptr; } ) );
1202 const timespec atime{ 900, 1234 };
1203 const timespec writetime{ 901, 5678 };
1204 source.publishFrame( 0, 21, 1000, atime, writetime );
1206 REQUIRE( waitFor( [&app]() {
return app.m_currImage == 1; } ) );
1207 REQUIRE( app.m_width == 3 );
1208 REQUIRE( app.m_height == 2 );
1209 REQUIRE( rawFrameWord( app, 0, 0 ) == 1000 );
1210 REQUIRE( rawFrameWord( app, 0, 5 ) == 1005 );
1211 REQUIRE( timingWord( app, 0, 0 ) == 21 );
1212 REQUIRE( timingWord( app, 0, 1 ) ==
static_cast<uint64_t
>( atime.tv_sec ) );
1213 REQUIRE( timingWord( app, 0, 2 ) ==
static_cast<uint64_t
>( atime.tv_nsec ) );
1214 REQUIRE( timingWord( app, 0, 3 ) ==
static_cast<uint64_t
>( writetime.tv_sec ) );
1215 REQUIRE( timingWord( app, 0, 4 ) ==
static_cast<uint64_t
>( writetime.tv_nsec ) );
1216 REQUIRE( app.m_currImageTime ==
static_cast<uint64_t
>( writetime.tv_sec ) * 1000000000ULL +
1217 static_cast<uint64_t
>( writetime.tv_nsec ) );
1219 app.stopFgHarness();