API
dbIngest.py
Go to the documentation of this file.
1 import glob
2 from purepyindi2 import device, properties, constants, messages
3 from purepyindi2.messages import DefNumber, DefSwitch, DefText
4 import sys
5 import logging
6 import xconf
7 from magaox.indi.device import XDevice
8 from magaox.constants import DEFAULT_PREFIX
9 from magaox.db.config import BaseDeviceConfig
10 from magaox.db import Telem, FileOrigin, UserLog
11 from magaox.db import ingest
12 from magaox.utils import parse_iso_datetime_as_utc, creation_time_from_filename
13 
14 import json
15 import orjson
16 import xconf
17 import subprocess
18 import queue
19 import socket
20 import threading
21 import pathlib
22 import time
23 import os.path
24 import os
25 import sys
26 import datetime
27 from datetime import timezone
28 from watchdog.observers import Observer, BaseObserverSubclassCallable
29 from watchdog.events import FileSystemEventHandler
30 
31 class NewXFilesHandler(FileSystemEventHandler):
32  def __init__(self, host, events_queue, log_name):
33  self.hosthost = host
34  self.events_queueevents_queue = events_queue
35  self.loglog = logging.getLogger(log_name)
36 
37  def construct_message(self, stat_result, event, is_new_file=False):
38  return FileOrigin(
39  origin_host=self.hosthost,
40  origin_path=event.src_path,
41  creation_time=creation_time_from_filename(event.src_path, stat_result=stat_result),
42  modification_time=datetime.datetime.fromtimestamp(stat_result.st_mtime),
43  size_bytes=stat_result.st_size,
44  )
45 
46  def on_created(self, event):
47  if event.is_directory:
48  return
49  try:
50  stat_result = os.stat(event.src_path)
51  except FileNotFoundError:
52  return
53  self.events_queueevents_queue.put(self.construct_messageconstruct_message(stat_result, event, is_new_file=True))
54 
55  def on_modified(self, event):
56  if event.is_directory:
57  return
58  try:
59  stat_result = os.stat(event.src_path)
60  except FileNotFoundError:
61  return
62  self.events_queueevents_queue.put(self.construct_messageconstruct_message(stat_result, event, is_new_file=False))
63 
64 RETRY_WAIT_SEC = 2
65 CREATE_CONNECTION_TIMEOUT_SEC = 2
66 EXIT_TIMEOUT_SEC = 2
67 
68 def _run_logdump_thread(logger_name, logdump_dir, logdump_args, name, message_queue, record_class):
69  # filter what content from user_logs gets put into db
70  log = logging.getLogger(logger_name)
71  glob_pat = logdump_dir + f'/{name}_*'
72  has_no_logs = len(glob.glob(glob_pat)) == 0
73  if has_no_logs:
74  log.debug(f"No matching files found for {glob_pat}")
75  while True:
76  while has_no_logs := len(glob.glob(glob_pat)) == 0:
77  time.sleep(RETRY_WAIT_SEC)
78  try:
79  args = logdump_args + ('--dir='+logdump_dir, '-J', '-f', name)
80  log.debug(f"Running logdump command {repr(' '.join(args))} for {name} in follow mode")
81  p = subprocess.Popen(args, stdout=subprocess.PIPE, stdin=subprocess.DEVNULL, stderr=subprocess.DEVNULL, text=True)
82  for line in p.stdout:
83  log.debug(f"Log line read: {line}")
84  message = record_class.from_json(name, line)
85  message_queue.put(message)
86  if p.returncode != 0:
87  raise RuntimeError(f"{name} logdump exited with {p.returncode} ({repr(' '.join(args))})")
88  except Exception as e:
89  log.exception(f"Exception in log/telem follower for {name}")
90 
91 @xconf.config
92 class dbIngestConfig(BaseDeviceConfig):
93  proclist : str = xconf.field(default="/opt/MagAOX/config/proclist_%s.txt", help="Path to process list file, %s will be replaced with the value of $MAGAOX_ROLE (or an empty string if absent from the environment)")
94  logdump_exe : str = xconf.field(default="/opt/MagAOX/bin/logdump", help="logdump (a.k.a. teldump) executable to use")
95 
96 class dbIngest(XDevice):
97  config : dbIngestConfig
98  telem_threads : list[tuple[str, threading.Thread]]
99  telem_queue : queue.Queue
100  fs_observer : BaseObserverSubclassCallable
101  fs_queue : queue.Queue
102  last_update_ts_sec : float
103  startup_ts_sec : float
104  records_since_startup : float
105 
106  #add user_log support here
107  user_log_threads : list[tuple[str, threading.Thread]]
108  user_log_queue : queue.Queue
109 
110  def launch_followers(self, dev):
111  telem_args = self.log.name + '.' + dev, '/opt/MagAOX/telem', (self.config.logdump_exe, '--ext=.bintel'), dev, self.telem_queuetelem_queue, Telem
112  telem_thread = threading.Thread(target=_run_logdump_thread, args=telem_args, daemon=True)
113  telem_thread.start()
114  self.log.debug(f"Watching {dev} for incoming telem")
115  self.telem_threadstelem_threads.append((dev, telem_thread))
116 
117  #userLog support here
118  if dev == "observers":
119  ULog_args = self.log.name + '.' + dev, '/opt/MagAOX/logs', (self.config.logdump_exe, '--ext=.binlog'), dev, self.user_log_queueuser_log_queue, UserLog
120  user_log_thread = threading.Thread(target=_run_logdump_thread, args= ULog_args, daemon=True)
121  user_log_thread.start()
122  self.log.debug(f"Watching {dev} for incoming user logs")
123  self.user_log_threadsuser_log_threads.append((dev, user_log_thread))
124 
126  self.properties['last_update']['timestamp'] = self.last_update_ts_seclast_update_ts_sec
127  self.update_property(self.properties['last_update'])
128  self.properties['records']['since_startup'] = self.records_since_startuprecords_since_startup
129  self.properties['records']['per_sec'] = self.records_since_startuprecords_since_startup / (time.time() - self.startup_ts_secstartup_ts_sec)
130  self.update_property(self.properties['records'])
131 
132  def setup(self):
133  self.last_update_ts_seclast_update_ts_sec = time.time()
134  self.records_since_startuprecords_since_startup = 0
135  self.records_per_secrecords_per_sec = 0.0
136  last_update = properties.NumberVector(name="last_update", perm=constants.PropertyPerm.READ_ONLY)
137  last_update.add_element(DefNumber(
138  name="timestamp",
139  _value=self.last_update_ts_seclast_update_ts_sec,
140  min=0.0, max=1e200, format='%f',
141  step=1e-6,
142  ))
143  self.add_property(last_update)
144 
145  records = properties.NumberVector(name="records", perm=constants.PropertyPerm.READ_ONLY)
146  records.add_element(DefNumber(
147  name="per_sec",
148  _value=0.0,
149  min=0.0, max=1e200, format='%f',
150  step=1e-6,
151  ))
152  records.add_element(DefNumber(
153  name="since_startup",
154  _value=0,
155  min=0, max=1_000_000_000, format='%i',
156  step=1,
157  ))
158  self.add_property(records)
159 
160  self.connconn = self.config.database.connect()
161 
162  role = os.environ.get('MAGAOX_ROLE', '')
163  proclist = pathlib.Path(self.config.proclist.replace('%s', role))
164  if not proclist.exists():
165  raise RuntimeError(f"No process list at {proclist}")
166 
167  device_names = set()
168 
169  with proclist.open() as fh:
170  for line in fh:
171  if not line.strip():
172  continue
173  if line.strip()[0] == '#':
174  continue
175  parts = line.split(None, 1)
176  if len(parts) != 2:
177  raise RuntimeError(f"Got malformed proclist line: {repr(line)}")
178  device_names.add(parts[0])
179 
180  #setup user log here too
181  self.user_log_queueuser_log_queue = queue.Queue()
182  self.user_log_threadsuser_log_threads = []
183 
184  self.telem_queuetelem_queue = queue.Queue()
185  self.telem_threadstelem_threads = []
186  for dev in device_names:
187  self.launch_followerslaunch_followers(dev)
188 
189  self.startup_ts_secstartup_ts_sec = time.time()
190 
191  # rescan for inventory
192  self.rescan_filesrescan_files()
193 
194  self.fs_queuefs_queue = queue.Queue()
195  event_handler = NewXFilesHandler(self.config.hostname, self.fs_queuefs_queue, self.log.name + '.fs_observer')
196  self.fs_observerfs_observer = Observer()
197  for dirname in self.config.data_dirs:
198  dirpath = self.config.common_path_prefix / dirname
199  if not dirpath.exists():
200  self.log.debug(f"No {dirpath} to watch")
201  continue
202  self.fs_observerfs_observer.schedule(event_handler, dirpath, recursive=True)
203  self.log.info(f"Watching {dirpath} for changes")
204  self.fs_observerfs_observer.start()
205 
206  def rescan_files(self):
207  search_paths = [self.config.common_path_prefix / name for name in self.config.data_dirs]
208  with self.connconn.cursor() as cur:
209  ingest.update_file_inventory(cur, self.config.hostname, search_paths)
210  self.log.info(f"Completed startup rescan of file inventory for {self.config.hostname} from {search_paths}")
211 
212  def ingest_line(self, line):
213  # avoid infinite loop of modifying log file and getting notified of the modification
214  if self.log_file_name.encode('utf8') not in line:
215  self.log.debug(line)
216 
217  def loop(self):
218  telems = []
219  try:
220  while rec := self.telem_queuetelem_queue.get(timeout=0.1):
221  telems.append(rec)
222  self.records_since_startuprecords_since_startup += 1
223  except queue.Empty:
224  pass
225 
226  fs_events = []
227  try:
228  while rec := self.fs_queuefs_queue.get(timeout=0.1):
229  fs_events.append(rec)
230  self.records_since_startuprecords_since_startup += 1
231  except queue.Empty:
232  pass
233 
234  #update for userlog here too
235  user_logs = []
236  try:
237  while rec := self.user_log_queueuser_log_queue.get(timeout=0.1):
238  user_logs.append(rec)
239  self.records_since_startuprecords_since_startup += 1
240  except queue.Empty:
241  pass
242 
243  with self.connconn.transaction():
244  cur = self.connconn.cursor()
245  ingest.batch_telem(cur, telems)
246  ingest.batch_file_origins(cur, fs_events)
247  ingest.batch_user_log(cur, user_logs)
248 
249  this_ts_sec = time.time()
250  self.last_update_ts_seclast_update_ts_sec = this_ts_sec
251  self.refresh_propertiesrefresh_properties()
def __init__(self, host, events_queue, log_name)
Definition: dbIngest.py:32
def on_created(self, event)
Definition: dbIngest.py:46
def construct_message(self, stat_result, event, is_new_file=False)
Definition: dbIngest.py:37
def on_modified(self, event)
Definition: dbIngest.py:55
def rescan_files(self)
Definition: dbIngest.py:206
def launch_followers(self, dev)
Definition: dbIngest.py:110
def setup(self)
Definition: dbIngest.py:132
def ingest_line(self, line)
Definition: dbIngest.py:212
def refresh_properties(self)
Definition: dbIngest.py:125
def loop(self)
Definition: dbIngest.py:217
def _run_logdump_thread(logger_name, logdump_dir, logdump_args, name, message_queue, record_class)
Definition: dbIngest.py:68