2 from purepyindi2
import device, properties, constants, messages
3 from purepyindi2.messages
import DefNumber, DefSwitch, DefText
7 from magaox.indi.device
import XDevice
8 from magaox.db.config
import BaseDeviceConfig
9 from magaox.db
import Telem, FileOrigin
10 from magaox.db
import ingest
11 from magaox.utils
import parse_iso_datetime_as_utc, creation_time_from_filename
25 from datetime
import timezone
26 from watchdog.observers
import Observer, BaseObserverSubclassCallable
27 from watchdog.events
import FileSystemEventHandler
30 def __init__(self, host, events_queue, log_name):
33 self.
loglog = logging.getLogger(log_name)
36 stat_result = os.stat(event.src_path)
38 origin_host=self.
hosthost,
39 origin_path=event.src_path,
40 creation_time=creation_time_from_filename(event.src_path, stat_result=stat_result),
41 modification_time=datetime.datetime.fromtimestamp(stat_result.st_mtime),
42 size_bytes=stat_result.st_size,
46 if event.is_directory:
51 if event.is_directory:
56 CREATE_CONNECTION_TIMEOUT_SEC = 2
60 log = logging.getLogger(logger_name)
61 glob_pat = logdump_dir + f
'/{name}_*'
62 has_no_logs = len(glob.glob(glob_pat)) == 0
64 log.debug(f
"No matching files found for {glob_pat}")
66 while has_no_logs := len(glob.glob(glob_pat)) == 0:
67 time.sleep(RETRY_WAIT_SEC)
69 args = logdump_args + (
'--dir='+logdump_dir,
'-J',
'-f', name)
70 log.debug(f
"Running logdump command {repr(' '.join(args))} for {name} in follow mode")
71 p = subprocess.Popen(args, stdout=subprocess.PIPE, stdin=subprocess.DEVNULL, stderr=subprocess.DEVNULL, text=
True)
73 message = Telem.from_json(name, line)
74 message_queue.put(message)
76 raise RuntimeError(f
"{name} logdump exited with {p.returncode} ({repr(' '.join(args))})")
77 except Exception
as e:
78 log.exception(f
"Exception in log/telem follower for {name}")
82 proclist : str = xconf.field(default=
"/opt/MagAOX/config/proclist_%s.txt", help=
"Path to process list file, %s will be replaced with the value of $MAGAOX_ROLE (or an empty string if absent from the environment)")
83 logdump_exe : str = xconf.field(default=
"/opt/MagAOX/bin/logdump", help=
"logdump (a.k.a. teldump) executable to use")
86 config : dbIngestConfig
87 telem_threads : list[tuple[str, threading.Thread]]
88 fs_observer : BaseObserverSubclassCallable
89 telem_queue : queue.Queue
90 fs_queue : queue.Queue
91 last_update_ts_sec : float
92 startup_ts_sec : float
93 records_since_startup : float
96 args = self.log.name +
'.' + dev,
'/opt/MagAOX/telem', (self.config.logdump_exe,
'--ext=.bintel'), dev, self.
telem_queuetelem_queue
97 telem_thread = threading.Thread(target=_run_logdump_thread, args=args, daemon=
True)
99 self.log.debug(f
"Watching {dev} for incoming telem")
103 self.properties[
'last_update'][
'timestamp'] = self.
last_update_ts_seclast_update_ts_sec
104 self.update_property(self.properties[
'last_update'])
107 self.update_property(self.properties[
'records'])
113 last_update = properties.NumberVector(name=
"last_update", perm=constants.PropertyPerm.READ_ONLY)
114 last_update.add_element(DefNumber(
117 min=0.0, max=1e200, format=
'%f',
120 self.add_property(last_update)
122 records = properties.NumberVector(name=
"records", perm=constants.PropertyPerm.READ_ONLY)
123 records.add_element(DefNumber(
126 min=0.0, max=1e200, format=
'%f',
129 records.add_element(DefNumber(
130 name=
"since_startup",
132 min=0, max=1_000_000_000, format=
'%i',
135 self.add_property(records)
137 self.
connconn = self.config.database.connect()
139 role = os.environ.get(
'MAGAOX_ROLE',
'')
140 proclist = pathlib.Path(self.config.proclist.replace(
'%s', role))
141 if not proclist.exists():
142 raise RuntimeError(f
"No process list at {proclist}")
146 with proclist.open()
as fh:
150 if line.strip()[0] ==
'#':
152 parts = line.split(
None, 1)
154 raise RuntimeError(f
"Got malformed proclist line: {repr(line)}")
155 device_names.add(parts[0])
159 for dev
in device_names:
170 for dirpath
in self.config.data_dirs:
171 self.
fs_observerfs_observer.schedule(event_handler, dirpath, recursive=
True)
172 self.log.info(f
"Watching {dirpath} for changes")
176 with self.
connconn.cursor()
as cur:
177 ingest.update_file_inventory(cur, self.config.hostname, self.config.data_dirs)
178 self.log.info(f
"Completed startup rescan of file inventory for {self.config.hostname}")
182 if self.log_file_name.encode(
'utf8')
not in line:
188 while rec := self.
telem_queuetelem_queue.get(timeout=0.1):
196 while rec := self.
fs_queuefs_queue.get(timeout=0.1):
197 fs_events.append(rec)
202 with self.
connconn.transaction():
203 cur = self.
connconn.cursor()
204 ingest.batch_telem(cur, telems)
205 ingest.batch_file_origins(cur, fs_events)
207 this_ts_sec = time.time()
def __init__(self, host, events_queue, log_name)
def on_created(self, event)
def construct_message(self, event, is_new_file=False)
def on_modified(self, event)
def ingest_line(self, line)
def launch_follower(self, dev)
def refresh_properties(self)
def _run_logdump_thread(logger_name, logdump_dir, logdump_args, name, message_queue)