API
 
Loading...
Searching...
No Matches
dbIngest.py
Go to the documentation of this file.
1import glob
2from purepyindi2 import device, properties, constants, messages
3from purepyindi2.messages import DefNumber, DefSwitch, DefText
4import sys
5import logging
6import xconf
7from magaox.indi.device import XDevice
8from magaox.constants import DEFAULT_PREFIX
9from magaox.db.config import BaseDeviceConfig
10from magaox.db import Telem, FileOrigin, UserLog
11from magaox.db import ingest
12from magaox.utils import parse_iso_datetime_as_utc, creation_time_from_filename
13
14import json
15import orjson
16import xconf
17import subprocess
18import queue
19import socket
20import threading
21import pathlib
22import time
23import os.path
24import os
25import sys
26import datetime
27from datetime import timezone
28from watchdog.observers import Observer, BaseObserverSubclassCallable
29from watchdog.events import FileSystemEventHandler
30
31class NewXFilesHandler(FileSystemEventHandler):
32 def __init__(self, host, events_queue, log_name):
33 self.host = host
34 self.events_queue = events_queue
35 self.log = logging.getLogger(log_name)
36
37 def construct_message(self, stat_result, event, is_new_file=False):
38 return FileOrigin(
39 origin_host=self.host,
40 origin_path=event.src_path,
41 creation_time=creation_time_from_filename(event.src_path, stat_result=stat_result),
43 size_bytes=stat_result.st_size,
44 )
45
46 def on_created(self, event):
48 return
49 try:
50 stat_result = os.stat(event.src_path)
51 except FileNotFoundError:
52 return
53 self.events_queue.put(self.construct_message(stat_result, event, is_new_file=True))
54
55 def on_modified(self, event):
57 return
58 try:
59 stat_result = os.stat(event.src_path)
60 except FileNotFoundError:
61 return
62 self.events_queue.put(self.construct_message(stat_result, event, is_new_file=False))
63
64RETRY_WAIT_SEC = 2
65CREATE_CONNECTION_TIMEOUT_SEC = 2
66EXIT_TIMEOUT_SEC = 2
67
68def _run_logdump_thread(logger_name, logdump_dir, logdump_args, name, message_queue, record_class):
69 # filter what content from user_logs gets put into db
70 log = logging.getLogger(logger_name)
71 glob_pat = logdump_dir + f'/{name}_*'
72 has_no_logs = len(glob.glob(glob_pat)) == 0
73 if has_no_logs:
74 log.debug(f"No matching files found for {glob_pat}")
75 while True:
76 while has_no_logs := len(glob.glob(glob_pat)) == 0:
77 time.sleep(RETRY_WAIT_SEC)
78 try:
79 args = logdump_args + ('--dir='+logdump_dir, '-J', '-f', name)
80 log.debug(f"Running logdump command {repr(' '.join(args))} for {name} in follow mode")
81 p = subprocess.Popen(args, stdout=subprocess.PIPE, stdin=subprocess.DEVNULL, stderr=subprocess.DEVNULL, text=True)
82 for line in p.stdout:
83 log.debug(f"Log line read: {line}")
84 message = record_class.from_json(name, line)
85 message_queue.put(message)
86 if p.returncode != 0:
87 raise RuntimeError(f"{name} logdump exited with {p.returncode} ({repr(' '.join(args))})")
88 except Exception as e:
89 log.exception(f"Exception in log/telem follower for {name}")
90
91@xconf.config
92class dbIngestConfig(BaseDeviceConfig):
93 proclist : str = xconf.field(default="/opt/MagAOX/config/proclist_%s.txt", help="Path to process list file, %s will be replaced with the value of $MAGAOX_ROLE (or an empty string if absent from the environment)")
94 logdump_exe : str = xconf.field(default="/opt/MagAOX/bin/logdump", help="logdump (a.k.a. teldump) executable to use")
95
96class dbIngest(XDevice):
97 config : dbIngestConfig
98 telem_threads : list[tuple[str, threading.Thread]]
99 telem_queue : queue.Queue
100 fs_observer : BaseObserverSubclassCallable
101 fs_queue : queue.Queue
102 last_update_ts_sec : float
103 startup_ts_sec : float
104 records_since_startup : float
105
106 #add user_log support here
107 user_log_threads : list[tuple[str, threading.Thread]]
108 user_log_queue : queue.Queue
109
110 def launch_followers(self, dev):
111 telem_args = self.log.name + '.' + dev, '/opt/MagAOX/telem', (self.config.logdump_exe, '--ext=.bintel'), dev, self.telem_queuetelem_queue, Telem
112 telem_thread = threading.Thread(target=_run_logdump_thread, args=telem_args, daemon=True)
113 telem_thread.start()
114 self.log.debug(f"Watching {dev} for incoming telem")
115 self.telem_threadstelem_threads.append((dev, telem_thread))
116
117 #userLog support here
118 if dev == "observers":
119 ULog_args = self.log.name + '.' + dev, '/opt/MagAOX/logs', (self.config.logdump_exe, '--ext=.binlog'), dev, self.user_log_queueuser_log_queue, UserLog
120 user_log_thread = threading.Thread(target=_run_logdump_thread, args= ULog_args, daemon=True)
121 user_log_thread.start()
122 self.log.debug(f"Watching {dev} for incoming user logs")
123 self.user_log_threadsuser_log_threads.append((dev, user_log_thread))
124
126 self.properties['last_update']['timestamp'] = self.last_update_ts_seclast_update_ts_sec
127 self.update_property(self.properties['last_update'])
128 self.properties['records']['since_startup'] = self.records_since_startuprecords_since_startup
129 self.properties['records']['per_sec'] = self.records_since_startuprecords_since_startup / (time.time() - self.startup_ts_secstartup_ts_sec)
130 self.update_property(self.properties['records'])
131
132 def setup(self):
136 last_update = properties.NumberVector(name="last_update", perm=constants.PropertyPerm.READ_ONLY)
137 last_update.add_element(DefNumber(
138 name="timestamp",
140 min=0.0, max=1e200, format='%f',
141 step=1e-6,
142 ))
143 self.add_property(last_update)
144
145 records = properties.NumberVector(name="records", perm=constants.PropertyPerm.READ_ONLY)
146 records.add_element(DefNumber(
147 name="per_sec",
148 _value=0.0,
149 min=0.0, max=1e200, format='%f',
150 step=1e-6,
151 ))
152 records.add_element(DefNumber(
153 name="since_startup",
154 _value=0,
155 min=0, max=1_000_000_000, format='%i',
156 step=1,
157 ))
158 self.add_property(records)
159
160 self.conn = self.config.database.connect()
161
162 role = os.environ.get('MAGAOX_ROLE', '')
163 proclist = pathlib.Path(self.config.proclist.replace('%s', role))
164 if not proclist.exists():
165 raise RuntimeError(f"No process list at {proclist}")
166
167 device_names = set()
168
169 with proclist.open() as fh:
170 for line in fh:
171 if not line.strip():
172 continue
173 if line.strip()[0] == '#':
174 continue
175 parts = line.split(None, 1)
176 if len(parts) != 2:
177 raise RuntimeError(f"Got malformed proclist line: {repr(line)}")
178 device_names.add(parts[0])
179
180 #setup user log here too
181 self.user_log_queueuser_log_queue = queue.Queue()
183
184 self.telem_queuetelem_queue = queue.Queue()
186 for dev in device_names:
187 self.launch_followers(dev)
188
190
191 # rescan for inventory
192 self.rescan_files()
193
194 self.fs_queuefs_queue = queue.Queue()
195 event_handler = NewXFilesHandler(self.config.hostname, self.fs_queuefs_queue, self.log.name + '.fs_observer')
196 self.fs_observerfs_observer = Observer()
197 for dirname in self.config.data_dirs:
198 dirpath = self.config.common_path_prefix / dirname
199 if not dirpath.exists():
200 self.log.debug(f"No {dirpath} to watch")
201 continue
202 self.fs_observerfs_observer.schedule(event_handler, dirpath, recursive=True)
203 self.log.info(f"Watching {dirpath} for changes")
204 self.fs_observerfs_observer.start()
205
206 def rescan_files(self):
207 search_paths = [self.config.common_path_prefix / name for name in self.config.data_dirs]
208 with self.conn.cursor() as cur:
209 ingest.update_file_inventory(cur, self.config.hostname, search_paths)
210 self.log.info(f"Completed startup rescan of file inventory for {self.config.hostname} from {search_paths}")
211
212 def ingest_line(self, line):
213 # avoid infinite loop of modifying log file and getting notified of the modification
214 if self.log_file_name.encode('utf8') not in line:
215 self.log.debug(line)
216
217 def loop(self):
218 telems = []
219 try:
220 while rec := self.telem_queuetelem_queue.get(timeout=0.1):
221 telems.append(rec)
223 except queue.Empty:
224 pass
225
226 fs_events = []
227 try:
228 while rec := self.fs_queuefs_queue.get(timeout=0.1):
229 fs_events.append(rec)
231 except queue.Empty:
232 pass
233
234 #update for userlog here too
235 user_logs = []
236 try:
237 while rec := self.user_log_queueuser_log_queue.get(timeout=0.1):
238 user_logs.append(rec)
240 except queue.Empty:
241 pass
242
243 with self.conn.transaction():
244 cur = self.conn.cursor()
245 ingest.batch_telem(cur, telems)
246 ingest.batch_file_origins(cur, fs_events)
247 ingest.batch_user_log(cur, user_logs)
248
249 this_ts_sec = time.time()
250 self.last_update_ts_seclast_update_ts_sec = this_ts_sec
251 self.refresh_properties()
__init__(self, host, events_queue, log_name)
Definition dbIngest.py:32
on_modified(self, event)
Definition dbIngest.py:55
on_created(self, event)
Definition dbIngest.py:46
construct_message(self, stat_result, event, is_new_file=False)
Definition dbIngest.py:37
float records_since_startup
Definition dbIngest.py:104
float last_update_ts_sec
Definition dbIngest.py:102
BaseObserverSubclassCallable fs_observer
Definition dbIngest.py:100
launch_followers(self, dev)
Definition dbIngest.py:110
ingest_line(self, line)
Definition dbIngest.py:212
dbIngestConfig config
Definition dbIngest.py:97
refresh_properties(self)
Definition dbIngest.py:125
_run_logdump_thread(logger_name, logdump_dir, logdump_args, name, message_queue, record_class)
Definition dbIngest.py:68