308 config.add(specificT::configSection() +
".threadPrio",
"", specificT::configSection() +
".threadPrio", argType::Required, specificT::configSection(),
"threadPrio",
false,
"int",
"The real-time priority of the shmimMonitor thread.");
310 config.add(specificT::configSection() +
".cpuset",
"", specificT::configSection() +
".cpuset", argType::Required, specificT::configSection(),
"cpuset",
false,
"string",
"The cpuset for the shmimMonitor thread.");
312 config.add(specificT::configSection() +
".shmimName",
"", specificT::configSection() +
".shmimName", argType::Required, specificT::configSection(),
"shmimName",
false,
"string",
"The name of the ImageStreamIO shared memory image. Will be used as /tmp/<shmimName>.im.shm.");
314 config.add(specificT::configSection() +
".getExistingFirst",
"", specificT::configSection() +
".getExistingFirst", argType::Required, specificT::configSection(),
"getExistingFirst",
false,
"bool",
"If true an existing image is loaded. If false we wait for a new image.");
317 m_shmimName = derived().configName();
337 m_indiP_shmimName = pcf::IndiProperty(pcf::IndiProperty::Text);
338 m_indiP_shmimName.setDevice(derived().configName());
339 m_indiP_shmimName.setName(specificT::indiPrefix() +
"_shmimName");
340 m_indiP_shmimName.setPerm(pcf::IndiProperty::ReadOnly);
341 m_indiP_shmimName.setState(pcf::IndiProperty::Idle);
342 m_indiP_shmimName.add(pcf::IndiElement(
"name"));
343 m_indiP_shmimName[
"name"] = m_shmimName;
345 if (derived().registerIndiPropertyNew(m_indiP_shmimName,
nullptr) < 0)
347 #ifndef SHMIMMONITOR_TEST_NOLOG
348 derivedT::template log<software_error>({__FILE__, __LINE__});
354 m_indiP_frameSize = pcf::IndiProperty(pcf::IndiProperty::Number);
355 m_indiP_frameSize.setDevice(derived().configName());
356 m_indiP_frameSize.setName(specificT::indiPrefix() +
"_frameSize");
357 m_indiP_frameSize.setPerm(pcf::IndiProperty::ReadOnly);
358 m_indiP_frameSize.setState(pcf::IndiProperty::Idle);
359 m_indiP_frameSize.add(pcf::IndiElement(
"width"));
360 m_indiP_frameSize[
"width"] = 0;
361 m_indiP_frameSize.add(pcf::IndiElement(
"height"));
362 m_indiP_frameSize[
"height"] = 0;
364 if (derived().registerIndiPropertyNew(m_indiP_frameSize,
nullptr) < 0)
366 #ifndef SHMIMMONITOR_TEST_NOLOG
367 derivedT::template log<software_error>({__FILE__, __LINE__});
373 struct sigaction act;
377 act.sa_flags = SA_SIGINFO;
382 if (sigaction(SIGUSR1, &act, 0) < 0)
384 std::string logss =
"Setting handler for SIGUSR1 failed. Errno says: ";
385 logss += strerror(errno);
387 derivedT::template log<software_error>({__FILE__, __LINE__, errno, 0, logss});
392 if (derived().threadStart(m_smThread, m_smThreadInit, m_smThreadID, m_smThreadProp, m_smThreadPrio, m_smCpuset, specificT::configSection(),
this, smThreadStart) < 0)
394 derivedT::template log<software_error>({__FILE__, __LINE__});
442 m_smThreadID = syscall(SYS_gettid);
445 while (m_smThreadInit ==
true && derived().shutdown() == 0)
454 while (derived().shutdown() == 0)
456 while ((derived().state() !=
stateCodes::OPERATING || m_shmimName ==
"") && !derived().shutdown() && !m_restart)
461 if (derived().shutdown())
476 ImageStreamIO_filename(SM_fname,
sizeof(SM_fname), m_shmimName.c_str());
477 SM_fd = open(SM_fname, O_RDWR);
481 derivedT::template log<text_log>(
"ImageStream " + m_shmimName +
" not found (yet). Retrying . . .", logPrio::LOG_NOTICE);
491 if (ImageStreamIO_openIm(&m_imageStream, m_shmimName.c_str()) == 0)
493 if (m_imageStream.md[0].sem <= m_semaphoreNumber)
495 ImageStreamIO_closeIm(&m_imageStream);
502 ImageStreamIO_filename(SM_fname,
sizeof(SM_fname), m_shmimName.c_str());
505 int rv = stat(SM_fname, &buffer);
509 derivedT::template log<software_critical>({__FILE__, __LINE__, errno,
"Could not get inode for " + m_shmimName +
". Source process will need to be restarted."});
510 ImageStreamIO_closeIm(&m_imageStream);
513 m_inode = buffer.st_ino;
528 if (derived().m_shutdown)
533 ImageStreamIO_closeIm(&m_imageStream);
537 m_semaphoreNumber = ImageStreamIO_getsemwaitindex(&m_imageStream, m_semaphoreNumber);
539 if (m_semaphoreNumber < 0)
541 derivedT::template log<software_critical>({__FILE__, __LINE__,
"No valid semaphore found for " + m_shmimName +
". Source process will need to be restarted."});
545 derivedT::template log<software_info>({__FILE__, __LINE__,
"got semaphore index " + std::to_string(m_semaphoreNumber) +
" for " + m_shmimName});
547 ImageStreamIO_semflush(&m_imageStream, m_semaphoreNumber);
549 sem_t *sem = m_imageStream.semptr[m_semaphoreNumber];
551 m_dataType = m_imageStream.md[0].datatype;
552 m_typeSize = ImageStreamIO_typesize(m_dataType);
553 m_width = m_imageStream.md[0].size[0];
555 if (m_imageStream.md[0].naxis > 1)
557 m_height = m_imageStream.md[0].size[1];
559 if (m_imageStream.md[0].naxis > 2)
562 m_depth = m_imageStream.md[0].size[2];
576 if (derived().allocate(specificT()) < 0)
578 derivedT::template log<software_error>({__FILE__, __LINE__,
"allocation failed"});
583 size_t snx, sny, snz;
586 if (m_getExistingFirst && !m_restart && derived().shutdown() == 0)
588 if (m_imageStream.md[0].size[2] > 0)
590 curr_image = m_imageStream.md[0].cnt1;
597 atype = m_imageStream.md[0].datatype;
598 snx = m_imageStream.md[0].size[0];
602 sny = m_imageStream.md[0].size[1];
607 sny = m_imageStream.md[0].size[1];
608 snz = m_imageStream.md[0].size[2];
616 if (atype != m_dataType || snx != m_width || sny != m_height || snz != m_depth)
621 char *curr_src = (
char *)m_imageStream.array.raw + curr_image * m_width * m_height * m_typeSize;
623 if (derived().processImage(curr_src, specificT()) < 0)
625 derivedT::template log<software_error>({__FILE__, __LINE__});
635 if (clock_gettime(CLOCK_REALTIME, &ts) < 0)
637 derivedT::template log<software_critical>({__FILE__, __LINE__, errno, 0,
"clock_gettime"});
643 if (sem_timedwait(sem, &ts) == 0)
645 if (m_imageStream.md[0].size[2] > 0)
647 curr_image = m_imageStream.md[0].cnt1;
652 atype = m_imageStream.md[0].datatype;
653 snx = m_imageStream.md[0].size[0];
657 sny = m_imageStream.md[0].size[1];
662 sny = m_imageStream.md[0].size[1];
663 snz = m_imageStream.md[0].size[2];
671 if (atype != m_dataType || snx != m_width || sny != m_height || snz != m_depth)
679 char *curr_src = (
char *)m_imageStream.array.raw + curr_image * m_width * m_height * m_typeSize;
681 if (derived().processImage(curr_src, specificT()) < 0)
683 derivedT::template log<software_error>({__FILE__, __LINE__});
688 if (m_imageStream.md[0].sem <= 0)
697 if (errno != ETIMEDOUT)
699 derivedT::template log<software_error>({__FILE__, __LINE__, errno,
"sem_timedwait"});
706 ImageStreamIO_filename(SM_fname,
sizeof(SM_fname), m_shmimName.c_str());
707 SM_fd = open(SM_fname, O_RDWR);
716 int rv = stat(SM_fname, &buffer);
722 if (buffer.st_ino != m_inode)
734 if (m_semaphoreNumber >= 0)
735 m_imageStream.semReadPID[m_semaphoreNumber] = 0;
736 ImageStreamIO_closeIm(&m_imageStream);
749 ImageStreamIO_closeIm(&m_imageStream);