API
 
Loading...
Searching...
No Matches
dbIngest.py
Go to the documentation of this file.
1import glob
2from purepyindi2 import device, properties, constants, messages
3from purepyindi2.messages import DefNumber, DefSwitch, DefText
4import sys
5import logging
6import xconf
7import psycopg
8from magaox.indi.device import XDevice
9from magaox.constants import DEFAULT_PREFIX
10from magaox.db.config import BaseDbDeviceConfig
11from magaox.db import Telem, FileOrigin, UserLog
12from magaox.db import ingest
13from magaox.utils import parse_iso_datetime_as_utc, creation_time_from_filename
14
15import json
16import orjson
17import xconf
18import subprocess
19import queue
20import socket
21import threading
22import pathlib
23import time
24import os.path
25import os
26import sys
27import datetime
28from datetime import timezone
29from watchdog.observers import Observer, BaseObserverSubclassCallable
30from watchdog.events import FileSystemEventHandler
31
32class NewXFilesHandler(FileSystemEventHandler):
33 def __init__(self, host, events_queue, log_name):
34 self.host = host
35 self.events_queue = events_queue
36 self.log = logging.getLogger(log_name)
37
38 def construct_message(self, stat_result, event, is_new_file=False):
39 return FileOrigin(
40 origin_host=self.host,
41 origin_path=event.src_path,
42 creation_time=creation_time_from_filename(event.src_path, stat_result=stat_result),
44 size_bytes=stat_result.st_size,
45 )
46
47 def on_created(self, event):
49 return
50 try:
51 stat_result = os.stat(event.src_path)
52 except FileNotFoundError:
53 return
54 self.events_queue.put(self.construct_message(stat_result, event, is_new_file=True))
55
56 def on_modified(self, event):
58 return
59 try:
60 stat_result = os.stat(event.src_path)
61 except FileNotFoundError:
62 return
63 self.events_queue.put(self.construct_message(stat_result, event, is_new_file=False))
64
65RETRY_WAIT_SEC = 2
66CREATE_CONNECTION_TIMEOUT_SEC = 2
67EXIT_TIMEOUT_SEC = 2
68
69def _run_logdump_thread(logger_name, logdump_dir, logdump_args, name, message_queue, record_class):
70 # filter what content from user_logs gets put into db
71 log = logging.getLogger(logger_name)
72 while True:
73 try:
74 args = logdump_args + ('--dir='+logdump_dir, '-J', '-f', name)
75 log.debug(f"Running logdump command {repr(' '.join(args))} for {name} in follow mode")
76 p = subprocess.Popen(args, stdout=subprocess.PIPE, stdin=subprocess.DEVNULL, stderr=subprocess.DEVNULL, text=True)
77 for line in p.stdout:
78 log.debug(f"Log line read: {line}")
79 message = record_class.from_json(name, line)
80 message_queue.put(message)
81 p.wait() # stdout is over when the process exits
82 if p.returncode != 0:
83 raise RuntimeError(f"{name} logdump exited with {p.returncode} ({repr(' '.join(args))})")
84 except Exception as e:
85 glob_pattern = logdump_dir + f"/{name}/*/{name}_*"
86 if len(glob.glob(glob_pattern + ".ndjson.gz")):
87 log.info(f"Looks like {name} is a Python app; support is TODO")
88 return
89 if len(glob.glob(glob_pattern)):
90 log.exception(f"Exception in log/telem follower for {name}")
91 else:
92 log.info(f"No files found for {name}, waiting for them to appear")
93 while not len(glob.glob(glob_pattern)):
94 time.sleep(RETRY_WAIT_SEC)
95
96@xconf.config
97class dbIngestConfig(BaseDbDeviceConfig):
98 proclist : str = xconf.field(default="/opt/MagAOX/config/proclist_%s.txt", help="Path to process list file, %s will be replaced with the value of $MAGAOX_ROLE (or an empty string if absent from the environment)")
99 logdump_exe : str = xconf.field(default="/opt/MagAOX/bin/logdump", help="logdump (a.k.a. teldump) executable to use")
100
101class dbIngest(XDevice):
102 config : dbIngestConfig
103 telem_threads : list[tuple[str, threading.Thread]]
104 telem_queue : queue.Queue
105 fs_observer : BaseObserverSubclassCallable
106 fs_queue : queue.Queue
107 last_update_ts_sec : float
108 startup_ts_sec : float
109 records_since_startup : float
110 _connections : list[psycopg.Connection]
111 _should_connect : bool = True
112
113 #add user_log support here
114 user_log_threads : list[tuple[str, threading.Thread]]
115 user_log_queue : queue.Queue
116
117 def launch_followers(self, dev):
118 telem_args = self.log.name + '.' + dev, '/opt/MagAOX/telem', (self.config.logdump_exe, '--ext=.bintel'), dev, self.telem_queuetelem_queue, Telem
119 telem_thread = threading.Thread(target=_run_logdump_thread, args=telem_args, daemon=True)
120 telem_thread.start()
121 self.log.debug(f"Watching {dev} for incoming telem")
122 self.telem_threadstelem_threads.append((dev, telem_thread))
123
124 if dev == "observers":
125 ULog_args = self.log.name + '.' + dev, '/opt/MagAOX/logs', (self.config.logdump_exe, '--ext=.binlog'), dev, self.user_log_queueuser_log_queue, UserLog
126 user_log_thread = threading.Thread(target=_run_logdump_thread, args= ULog_args, daemon=True)
127 user_log_thread.start()
128 self.log.debug(f"Watching {dev} for incoming user logs")
129 self.user_log_threadsuser_log_threads.append((dev, user_log_thread))
130
132 self.properties['last_update']['timestamp'] = self.last_update_ts_seclast_update_ts_sec
133 self.update_property(self.properties['last_update'])
134 self.properties['records']['since_startup'] = self.records_since_startuprecords_since_startup
135 self.properties['records']['per_sec'] = self.records_since_startuprecords_since_startup / (time.time() - self.startup_ts_secstartup_ts_sec)
136 self.update_property(self.properties['records'])
137
138 def setup(self):
142 last_update = properties.NumberVector(name="last_update", perm=constants.PropertyPerm.READ_ONLY)
143 last_update.add_element(DefNumber(
144 name="timestamp",
146 min=0.0, max=1e200, format='%f',
147 step=1e-6,
148 ))
149 self.add_property(last_update)
150
151 records = properties.NumberVector(name="records", perm=constants.PropertyPerm.READ_ONLY)
152 records.add_element(DefNumber(
153 name="per_sec",
154 _value=0.0,
155 min=0.0, max=1e200, format='%f',
156 step=1e-6,
157 ))
158 records.add_element(DefNumber(
159 name="since_startup",
160 _value=0,
161 min=0, max=1_000_000_000, format='%i',
162 step=1,
163 ))
164 self.add_property(records)
165
166 role = os.environ.get('MAGAOX_ROLE', '')
167 proclist = pathlib.Path(self.config.proclist.replace('%s', role))
168 if not proclist.exists():
169 raise RuntimeError(f"No process list at {proclist}")
170
171 device_names = set()
172
173 with proclist.open() as fh:
174 for line in fh:
175 if not line.strip():
176 continue
177 if line.strip()[0] == '#':
178 continue
179 parts = line.split(None, 1)
180 if len(parts) != 2:
181 raise RuntimeError(f"Got malformed proclist line: {repr(line)}")
182 device_names.add(parts[0])
183
184 self.user_log_queueuser_log_queue = queue.Queue()
186
187 self.telem_queuetelem_queue = queue.Queue()
189 for dev in device_names:
190 self.launch_followers(dev)
191
193
194 self._connections_connections = self.config.connect_to_databases()
196
197 # rescan for inventory
198 self.rescan_files()
199
200 self.fs_queuefs_queue = queue.Queue()
201 event_handler = NewXFilesHandler(self.config.hostname, self.fs_queuefs_queue, self.log.name + '.fs_observer')
202 self.fs_observerfs_observer = Observer()
203 for dirname in self.config.data_dirs:
204 dirpath = self.config.common_path_prefix / dirname
205 if not dirpath.exists():
206 self.log.debug(f"No {dirpath} to watch")
207 continue
208 self.fs_observerfs_observer.schedule(event_handler, dirpath, recursive=True)
209 self.log.info(f"Watching {dirpath} for changes")
210 self.fs_observerfs_observer.start()
211
212
213
214 def rescan_files(self):
215 search_paths = [self.config.common_path_prefix / name for name in self.config.data_dirs]
216 try:
217 for conn in self._connections_connections:
218 with conn.cursor() as cur:
219 # n.b. the state of the file inventory and which files
220 # are 'new' can be different depending on which
221 # database we're talking about, so we rescan once
222 # per connection
223 self.log.debug(f"Scanning for new files for {conn.info.dsn}")
224 ingest.update_file_inventory(
225 cur,
226 self.config.hostname,
227 search_paths,
228 self.config.ignore_patterns.files, self.config.ignore_patterns.directories
229 )
230 except Exception:
231 self.log.exception(f"Failed to rescan/inventory files for {conn.info.dsn}, attempting to reconnect")
233 return
234 self.log.info(f"Completed startup rescan of file inventory for {self.config.hostname} from {search_paths}")
235
236 def ingest_line(self, line):
237 # avoid infinite loop of modifying log file and getting notified of the modification
238 if self.log_file_name.encode('utf8') not in line:
239 self.log.debug(line)
240
241 def loop(self):
243 self._connections_connections = self.config.connect_to_databases(existing_connections=self._connections_connections)
245
246 telems = []
247 try:
248 while rec := self.telem_queuetelem_queue.get(timeout=0.1):
249 telems.append(rec)
251 except queue.Empty:
252 pass
253
254 fs_events = []
255 try:
256 while rec := self.fs_queuefs_queue.get(timeout=0.1):
257 fs_events.append(rec)
259 except queue.Empty:
260 pass
261
262 #update for userlog here too
263 user_logs = []
264 try:
265 while rec := self.user_log_queueuser_log_queue.get(timeout=0.1):
266 user_logs.append(rec)
268 except queue.Empty:
269 pass
270
271 try:
272 for conn in self._connections_connections:
273 self.log.debug(f"Batching ingest for {conn.info.dsn}")
274 with conn.transaction():
275 cur = conn.cursor()
276 ingest.batch_telem(cur, telems)
277 ingest.batch_file_origins(cur, fs_events)
278 ingest.batch_user_log(cur, user_logs)
279 except Exception as e:
280 self.log.exception("Caught exception in batch ingest, reconnecting")
282
283 this_ts_sec = time.time()
284 self.last_update_ts_seclast_update_ts_sec = this_ts_sec
285 self.refresh_properties()
__init__(self, host, events_queue, log_name)
Definition dbIngest.py:33
on_modified(self, event)
Definition dbIngest.py:56
on_created(self, event)
Definition dbIngest.py:47
construct_message(self, stat_result, event, is_new_file=False)
Definition dbIngest.py:38
float records_since_startup
Definition dbIngest.py:109
float last_update_ts_sec
Definition dbIngest.py:107
BaseObserverSubclassCallable fs_observer
Definition dbIngest.py:105
launch_followers(self, dev)
Definition dbIngest.py:117
ingest_line(self, line)
Definition dbIngest.py:236
dbIngestConfig config
Definition dbIngest.py:102
refresh_properties(self)
Definition dbIngest.py:131
_run_logdump_thread(logger_name, logdump_dir, logdump_args, name, message_queue, record_class)
Definition dbIngest.py:69