API
 
Loading...
Searching...
No Matches
dbIngest.py
Go to the documentation of this file.
1import glob
2from purepyindi2 import device, properties, constants, messages
3from purepyindi2.messages import DefNumber, DefSwitch, DefText
4import sys
5import logging
6import xconf
7import psycopg
8from magaox.indi.device import XDevice
9from magaox.constants import DEFAULT_PREFIX
10from magaox.db.config import BaseDbDeviceConfig
11from magaox.db import Telem, FileOrigin, UserLog
12from magaox.db import ingest
13from magaox.utils import parse_iso_datetime_as_utc, creation_time_from_filename
14
15import json
16import orjson
17import xconf
18import subprocess
19import queue
20import socket
21import threading
22import pathlib
23import time
24import os.path
25import os
26import sys
27import datetime
28from datetime import timezone
29from watchdog.observers import Observer, BaseObserverSubclassCallable
30from watchdog.events import FileSystemEventHandler
31
32class NewXFilesHandler(FileSystemEventHandler):
33 def __init__(self, host, events_queue, log_name):
34 self.host = host
35 self.events_queue = events_queue
36 self.log = logging.getLogger(log_name)
37
38 def construct_message(self, stat_result, event, is_new_file=False):
39 return FileOrigin(
40 origin_host=self.host,
41 origin_path=event.src_path,
42 creation_time=creation_time_from_filename(event.src_path, stat_result=stat_result),
44 size_bytes=stat_result.st_size,
45 )
46
47 def on_created(self, event):
49 return
50 try:
51 stat_result = os.stat(event.src_path)
52 except FileNotFoundError:
53 return
54 self.events_queue.put(self.construct_message(stat_result, event, is_new_file=True))
55
56 def on_modified(self, event):
58 return
59 try:
60 stat_result = os.stat(event.src_path)
61 except FileNotFoundError:
62 return
63 self.events_queue.put(self.construct_message(stat_result, event, is_new_file=False))
64
65RETRY_WAIT_SEC = 2
66CREATE_CONNECTION_TIMEOUT_SEC = 2
67EXIT_TIMEOUT_SEC = 2
68
69def _run_logdump_thread(logger_name, logdump_dir, logdump_args, name, message_queue, record_class):
70 # filter what content from user_logs gets put into db
71 log = logging.getLogger(logger_name)
72 glob_pat = logdump_dir + f'/{name}_*'
73 has_no_logs = len(glob.glob(glob_pat)) == 0
74 if has_no_logs:
75 log.debug(f"No matching files found for {glob_pat}")
76 while True:
77 while has_no_logs := len(glob.glob(glob_pat)) == 0:
78 time.sleep(RETRY_WAIT_SEC)
79 try:
80 args = logdump_args + ('--dir='+logdump_dir, '-J', '-f', name)
81 log.debug(f"Running logdump command {repr(' '.join(args))} for {name} in follow mode")
82 p = subprocess.Popen(args, stdout=subprocess.PIPE, stdin=subprocess.DEVNULL, stderr=subprocess.DEVNULL, text=True)
83 for line in p.stdout:
84 log.debug(f"Log line read: {line}")
85 message = record_class.from_json(name, line)
86 message_queue.put(message)
87 if p.returncode != 0:
88 raise RuntimeError(f"{name} logdump exited with {p.returncode} ({repr(' '.join(args))})")
89 except Exception as e:
90 log.exception(f"Exception in log/telem follower for {name}")
91
92@xconf.config
93class dbIngestConfig(BaseDbDeviceConfig):
94 proclist : str = xconf.field(default="/opt/MagAOX/config/proclist_%s.txt", help="Path to process list file, %s will be replaced with the value of $MAGAOX_ROLE (or an empty string if absent from the environment)")
95 logdump_exe : str = xconf.field(default="/opt/MagAOX/bin/logdump", help="logdump (a.k.a. teldump) executable to use")
96
97class dbIngest(XDevice):
98 config : dbIngestConfig
99 telem_threads : list[tuple[str, threading.Thread]]
100 telem_queue : queue.Queue
101 fs_observer : BaseObserverSubclassCallable
102 fs_queue : queue.Queue
103 last_update_ts_sec : float
104 startup_ts_sec : float
105 records_since_startup : float
106 _connections : list[psycopg.Connection]
107 _should_connect : bool = True
108
109 #add user_log support here
110 user_log_threads : list[tuple[str, threading.Thread]]
111 user_log_queue : queue.Queue
112
113 def launch_followers(self, dev):
114 telem_args = self.log.name + '.' + dev, '/opt/MagAOX/telem', (self.config.logdump_exe, '--ext=.bintel'), dev, self.telem_queuetelem_queue, Telem
115 telem_thread = threading.Thread(target=_run_logdump_thread, args=telem_args, daemon=True)
116 telem_thread.start()
117 self.log.debug(f"Watching {dev} for incoming telem")
118 self.telem_threadstelem_threads.append((dev, telem_thread))
119
120 if dev == "observers":
121 ULog_args = self.log.name + '.' + dev, '/opt/MagAOX/logs', (self.config.logdump_exe, '--ext=.binlog'), dev, self.user_log_queueuser_log_queue, UserLog
122 user_log_thread = threading.Thread(target=_run_logdump_thread, args= ULog_args, daemon=True)
123 user_log_thread.start()
124 self.log.debug(f"Watching {dev} for incoming user logs")
125 self.user_log_threadsuser_log_threads.append((dev, user_log_thread))
126
128 self.properties['last_update']['timestamp'] = self.last_update_ts_seclast_update_ts_sec
129 self.update_property(self.properties['last_update'])
130 self.properties['records']['since_startup'] = self.records_since_startuprecords_since_startup
131 self.properties['records']['per_sec'] = self.records_since_startuprecords_since_startup / (time.time() - self.startup_ts_secstartup_ts_sec)
132 self.update_property(self.properties['records'])
133
134 def setup(self):
138 last_update = properties.NumberVector(name="last_update", perm=constants.PropertyPerm.READ_ONLY)
139 last_update.add_element(DefNumber(
140 name="timestamp",
142 min=0.0, max=1e200, format='%f',
143 step=1e-6,
144 ))
145 self.add_property(last_update)
146
147 records = properties.NumberVector(name="records", perm=constants.PropertyPerm.READ_ONLY)
148 records.add_element(DefNumber(
149 name="per_sec",
150 _value=0.0,
151 min=0.0, max=1e200, format='%f',
152 step=1e-6,
153 ))
154 records.add_element(DefNumber(
155 name="since_startup",
156 _value=0,
157 min=0, max=1_000_000_000, format='%i',
158 step=1,
159 ))
160 self.add_property(records)
161
162 role = os.environ.get('MAGAOX_ROLE', '')
163 proclist = pathlib.Path(self.config.proclist.replace('%s', role))
164 if not proclist.exists():
165 raise RuntimeError(f"No process list at {proclist}")
166
167 device_names = set()
168
169 with proclist.open() as fh:
170 for line in fh:
171 if not line.strip():
172 continue
173 if line.strip()[0] == '#':
174 continue
175 parts = line.split(None, 1)
176 if len(parts) != 2:
177 raise RuntimeError(f"Got malformed proclist line: {repr(line)}")
178 device_names.add(parts[0])
179
180 self.user_log_queueuser_log_queue = queue.Queue()
182
183 self.telem_queuetelem_queue = queue.Queue()
185 for dev in device_names:
186 self.launch_followers(dev)
187
189
190 self._connections_connections = self.config.connect_to_databases()
192
193 # rescan for inventory
194 self.rescan_files()
195
196 self.fs_queuefs_queue = queue.Queue()
197 event_handler = NewXFilesHandler(self.config.hostname, self.fs_queuefs_queue, self.log.name + '.fs_observer')
198 self.fs_observerfs_observer = Observer()
199 for dirname in self.config.data_dirs:
200 dirpath = self.config.common_path_prefix / dirname
201 if not dirpath.exists():
202 self.log.debug(f"No {dirpath} to watch")
203 continue
204 self.fs_observerfs_observer.schedule(event_handler, dirpath, recursive=True)
205 self.log.info(f"Watching {dirpath} for changes")
206 self.fs_observerfs_observer.start()
207
208
209
210 def rescan_files(self):
211 search_paths = [self.config.common_path_prefix / name for name in self.config.data_dirs]
212 try:
213 for conn in self._connections_connections:
214 with conn.cursor() as cur:
215 # n.b. the state of the file inventory and which files
216 # are 'new' can be different depending on which
217 # database we're talking about, so we rescan once
218 # per connection
219 self.log.debug(f"Scanning for new files for {conn.info.dsn}")
220 ingest.update_file_inventory(
221 cur,
222 self.config.hostname,
223 search_paths,
224 self.config.ignore_patterns.files, self.config.ignore_patterns.directories
225 )
226 except Exception:
227 self.log.exception(f"Failed to rescan/inventory files for {conn.info.dsn}, attempting to reconnect")
229 return
230 self.log.info(f"Completed startup rescan of file inventory for {self.config.hostname} from {search_paths}")
231
232 def ingest_line(self, line):
233 # avoid infinite loop of modifying log file and getting notified of the modification
234 if self.log_file_name.encode('utf8') not in line:
235 self.log.debug(line)
236
237 def loop(self):
239 self._connections_connections = self.config.connect_to_databases(existing_connections=self._connections_connections)
241
242 telems = []
243 try:
244 while rec := self.telem_queuetelem_queue.get(timeout=0.1):
245 telems.append(rec)
247 except queue.Empty:
248 pass
249
250 fs_events = []
251 try:
252 while rec := self.fs_queuefs_queue.get(timeout=0.1):
253 fs_events.append(rec)
255 except queue.Empty:
256 pass
257
258 #update for userlog here too
259 user_logs = []
260 try:
261 while rec := self.user_log_queueuser_log_queue.get(timeout=0.1):
262 user_logs.append(rec)
264 except queue.Empty:
265 pass
266
267 try:
268 for conn in self._connections_connections:
269 self.log.debug(f"Batching ingest for {conn.info.dsn}")
270 with conn.transaction():
271 cur = conn.cursor()
272 ingest.batch_telem(cur, telems)
273 ingest.batch_file_origins(cur, fs_events)
274 ingest.batch_user_log(cur, user_logs)
275 except Exception as e:
276 self.log.exception("Caught exception in batch ingest, reconnecting")
278
279 this_ts_sec = time.time()
280 self.last_update_ts_seclast_update_ts_sec = this_ts_sec
281 self.refresh_properties()
__init__(self, host, events_queue, log_name)
Definition dbIngest.py:33
on_modified(self, event)
Definition dbIngest.py:56
on_created(self, event)
Definition dbIngest.py:47
construct_message(self, stat_result, event, is_new_file=False)
Definition dbIngest.py:38
float records_since_startup
Definition dbIngest.py:105
float last_update_ts_sec
Definition dbIngest.py:103
BaseObserverSubclassCallable fs_observer
Definition dbIngest.py:101
launch_followers(self, dev)
Definition dbIngest.py:113
ingest_line(self, line)
Definition dbIngest.py:232
dbIngestConfig config
Definition dbIngest.py:98
refresh_properties(self)
Definition dbIngest.py:127
_run_logdump_thread(logger_name, logdump_dir, logdump_args, name, message_queue, record_class)
Definition dbIngest.py:69