API
dbIngest.py
Go to the documentation of this file.
1 import glob
2 from purepyindi2 import device, properties, constants, messages
3 from purepyindi2.messages import DefNumber, DefSwitch, DefText
4 import sys
5 import logging
6 import xconf
7 from magaox.indi.device import XDevice
8 from magaox.db.config import BaseDeviceConfig
9 from magaox.db import Telem, FileOrigin
10 from magaox.db import ingest
11 from magaox.utils import parse_iso_datetime_as_utc, creation_time_from_filename
12 
13 import json
14 import xconf
15 import subprocess
16 import queue
17 import socket
18 import threading
19 import pathlib
20 import time
21 import os.path
22 import os
23 import sys
24 import datetime
25 from datetime import timezone
26 from watchdog.observers import Observer, BaseObserverSubclassCallable
27 from watchdog.events import FileSystemEventHandler
28 
29 class NewXFilesHandler(FileSystemEventHandler):
30  def __init__(self, host, events_queue, log_name):
31  self.hosthost = host
32  self.events_queueevents_queue = events_queue
33  self.loglog = logging.getLogger(log_name)
34 
35  def construct_message(self, event, is_new_file=False):
36  stat_result = os.stat(event.src_path)
37  return FileOrigin(
38  origin_host=self.hosthost,
39  origin_path=event.src_path,
40  creation_time=creation_time_from_filename(event.src_path, stat_result=stat_result),
41  modification_time=datetime.datetime.fromtimestamp(stat_result.st_mtime),
42  size_bytes=stat_result.st_size,
43  )
44 
45  def on_created(self, event):
46  if event.is_directory:
47  return
48  self.events_queueevents_queue.put(self.construct_messageconstruct_message(event, is_new_file=True))
49 
50  def on_modified(self, event):
51  if event.is_directory:
52  return
53  self.events_queueevents_queue.put(self.construct_messageconstruct_message(event, is_new_file=False))
54 
55 RETRY_WAIT_SEC = 2
56 CREATE_CONNECTION_TIMEOUT_SEC = 2
57 EXIT_TIMEOUT_SEC = 2
58 
59 def _run_logdump_thread(logger_name, logdump_dir, logdump_args, name, message_queue):
60  log = logging.getLogger(logger_name)
61  glob_pat = logdump_dir + f'/{name}_*'
62  has_no_logs = len(glob.glob(glob_pat)) == 0
63  if has_no_logs:
64  log.debug(f"No matching files found for {glob_pat}")
65  while True:
66  while has_no_logs := len(glob.glob(glob_pat)) == 0:
67  time.sleep(RETRY_WAIT_SEC)
68  try:
69  args = logdump_args + ('--dir='+logdump_dir, '-J', '-f', name)
70  log.debug(f"Running logdump command {repr(' '.join(args))} for {name} in follow mode")
71  p = subprocess.Popen(args, stdout=subprocess.PIPE, stdin=subprocess.DEVNULL, stderr=subprocess.DEVNULL, text=True)
72  for line in p.stdout:
73  message = Telem.from_json(name, line)
74  message_queue.put(message)
75  if p.returncode != 0:
76  raise RuntimeError(f"{name} logdump exited with {p.returncode} ({repr(' '.join(args))})")
77  except Exception as e:
78  log.exception(f"Exception in log/telem follower for {name}")
79 
80 @xconf.config
81 class dbIngestConfig(BaseDeviceConfig):
82  proclist : str = xconf.field(default="/opt/MagAOX/config/proclist_%s.txt", help="Path to process list file, %s will be replaced with the value of $MAGAOX_ROLE (or an empty string if absent from the environment)")
83  logdump_exe : str = xconf.field(default="/opt/MagAOX/bin/logdump", help="logdump (a.k.a. teldump) executable to use")
84 
85 class dbIngest(XDevice):
86  config : dbIngestConfig
87  telem_threads : list[tuple[str, threading.Thread]]
88  fs_observer : BaseObserverSubclassCallable
89  telem_queue : queue.Queue
90  fs_queue : queue.Queue
91  last_update_ts_sec : float
92  startup_ts_sec : float
93  records_since_startup : float
94 
95  def launch_follower(self, dev):
96  args = self.log.name + '.' + dev, '/opt/MagAOX/telem', (self.config.logdump_exe, '--ext=.bintel'), dev, self.telem_queuetelem_queue
97  telem_thread = threading.Thread(target=_run_logdump_thread, args=args, daemon=True)
98  telem_thread.start()
99  self.log.debug(f"Watching {dev} for incoming telem")
100  self.telem_threadstelem_threads.append((dev, telem_thread))
101 
103  self.properties['last_update']['timestamp'] = self.last_update_ts_seclast_update_ts_sec
104  self.update_property(self.properties['last_update'])
105  self.properties['records']['since_startup'] = self.records_since_startuprecords_since_startup
106  self.properties['records']['per_sec'] = self.records_since_startuprecords_since_startup / (time.time() - self.startup_ts_secstartup_ts_sec)
107  self.update_property(self.properties['records'])
108 
109  def setup(self):
110  self.last_update_ts_seclast_update_ts_sec = time.time()
111  self.records_since_startuprecords_since_startup = 0
112  self.records_per_secrecords_per_sec = 0.0
113  last_update = properties.NumberVector(name="last_update", perm=constants.PropertyPerm.READ_ONLY)
114  last_update.add_element(DefNumber(
115  name="timestamp",
116  _value=self.last_update_ts_seclast_update_ts_sec,
117  min=0.0, max=1e200, format='%f',
118  step=1e-6,
119  ))
120  self.add_property(last_update)
121 
122  records = properties.NumberVector(name="records", perm=constants.PropertyPerm.READ_ONLY)
123  records.add_element(DefNumber(
124  name="per_sec",
125  _value=0.0,
126  min=0.0, max=1e200, format='%f',
127  step=1e-6,
128  ))
129  records.add_element(DefNumber(
130  name="since_startup",
131  _value=0,
132  min=0, max=1_000_000_000, format='%i',
133  step=1,
134  ))
135  self.add_property(records)
136 
137  self.connconn = self.config.database.connect()
138 
139  role = os.environ.get('MAGAOX_ROLE', '')
140  proclist = pathlib.Path(self.config.proclist.replace('%s', role))
141  if not proclist.exists():
142  raise RuntimeError(f"No process list at {proclist}")
143 
144  device_names = set()
145 
146  with proclist.open() as fh:
147  for line in fh:
148  if not line.strip():
149  continue
150  if line.strip()[0] == '#':
151  continue
152  parts = line.split(None, 1)
153  if len(parts) != 2:
154  raise RuntimeError(f"Got malformed proclist line: {repr(line)}")
155  device_names.add(parts[0])
156 
157  self.telem_queuetelem_queue = queue.Queue()
158  self.telem_threadstelem_threads = []
159  for dev in device_names:
160  self.launch_followerlaunch_follower(dev)
161 
162  self.startup_ts_secstartup_ts_sec = time.time()
163 
164  # rescan for inventory
165  self.rescan_filesrescan_files()
166 
167  self.fs_queuefs_queue = queue.Queue()
168  event_handler = NewXFilesHandler(self.config.hostname, self.fs_queuefs_queue, self.log.name + '.fs_observer')
169  self.fs_observerfs_observer = Observer()
170  for dirpath in self.config.data_dirs:
171  self.fs_observerfs_observer.schedule(event_handler, dirpath, recursive=True)
172  self.log.info(f"Watching {dirpath} for changes")
173  self.fs_observerfs_observer.start()
174 
175  def rescan_files(self):
176  with self.connconn.cursor() as cur:
177  ingest.update_file_inventory(cur, self.config.hostname, self.config.data_dirs)
178  self.log.info(f"Completed startup rescan of file inventory for {self.config.hostname}")
179 
180  def ingest_line(self, line):
181  # avoid infinite loop of modifying log file and getting notified of the modification
182  if self.log_file_name.encode('utf8') not in line:
183  self.log.debug(line)
184 
185  def loop(self):
186  telems = []
187  try:
188  while rec := self.telem_queuetelem_queue.get(timeout=0.1):
189  telems.append(rec)
190  self.records_since_startuprecords_since_startup += 1
191  except queue.Empty:
192  pass
193 
194  fs_events = []
195  try:
196  while rec := self.fs_queuefs_queue.get(timeout=0.1):
197  fs_events.append(rec)
198  self.records_since_startuprecords_since_startup += 1
199  except queue.Empty:
200  pass
201 
202  with self.connconn.transaction():
203  cur = self.connconn.cursor()
204  ingest.batch_telem(cur, telems)
205  ingest.batch_file_origins(cur, fs_events)
206 
207  this_ts_sec = time.time()
208  self.last_update_ts_seclast_update_ts_sec = this_ts_sec
209  self.refresh_propertiesrefresh_properties()
def __init__(self, host, events_queue, log_name)
Definition: dbIngest.py:30
def on_created(self, event)
Definition: dbIngest.py:45
def construct_message(self, event, is_new_file=False)
Definition: dbIngest.py:35
def on_modified(self, event)
Definition: dbIngest.py:50
def rescan_files(self)
Definition: dbIngest.py:175
def setup(self)
Definition: dbIngest.py:109
def ingest_line(self, line)
Definition: dbIngest.py:180
def launch_follower(self, dev)
Definition: dbIngest.py:95
def refresh_properties(self)
Definition: dbIngest.py:102
def loop(self)
Definition: dbIngest.py:185
def _run_logdump_thread(logger_name, logdump_dir, logdump_args, name, message_queue)
Definition: dbIngest.py:59