2 from purepyindi2
import device, properties, constants, messages
3 from purepyindi2.messages
import DefNumber, DefSwitch, DefText
7 from magaox.indi.device
import XDevice
8 from magaox.constants
import DEFAULT_PREFIX
9 from magaox.db.config
import BaseDeviceConfig
10 from magaox.db
import Telem, FileOrigin, UserLog
11 from magaox.db
import ingest
12 from magaox.utils
import parse_iso_datetime_as_utc, creation_time_from_filename
27 from datetime
import timezone
28 from watchdog.observers
import Observer, BaseObserverSubclassCallable
29 from watchdog.events
import FileSystemEventHandler
32 def __init__(self, host, events_queue, log_name):
35 self.
loglog = logging.getLogger(log_name)
39 origin_host=self.
hosthost,
40 origin_path=event.src_path,
41 creation_time=creation_time_from_filename(event.src_path, stat_result=stat_result),
42 modification_time=datetime.datetime.fromtimestamp(stat_result.st_mtime),
43 size_bytes=stat_result.st_size,
47 if event.is_directory:
50 stat_result = os.stat(event.src_path)
51 except FileNotFoundError:
56 if event.is_directory:
59 stat_result = os.stat(event.src_path)
60 except FileNotFoundError:
65 CREATE_CONNECTION_TIMEOUT_SEC = 2
70 log = logging.getLogger(logger_name)
71 glob_pat = logdump_dir + f
'/{name}_*'
72 has_no_logs = len(glob.glob(glob_pat)) == 0
74 log.debug(f
"No matching files found for {glob_pat}")
76 while has_no_logs := len(glob.glob(glob_pat)) == 0:
77 time.sleep(RETRY_WAIT_SEC)
79 args = logdump_args + (
'--dir='+logdump_dir,
'-J',
'-f', name)
80 log.debug(f
"Running logdump command {repr(' '.join(args))} for {name} in follow mode")
81 p = subprocess.Popen(args, stdout=subprocess.PIPE, stdin=subprocess.DEVNULL, stderr=subprocess.DEVNULL, text=
True)
83 log.debug(f
"Log line read: {line}")
84 message = record_class.from_json(name, line)
85 message_queue.put(message)
87 raise RuntimeError(f
"{name} logdump exited with {p.returncode} ({repr(' '.join(args))})")
88 except Exception
as e:
89 log.exception(f
"Exception in log/telem follower for {name}")
93 proclist : str = xconf.field(default=
"/opt/MagAOX/config/proclist_%s.txt", help=
"Path to process list file, %s will be replaced with the value of $MAGAOX_ROLE (or an empty string if absent from the environment)")
94 logdump_exe : str = xconf.field(default=
"/opt/MagAOX/bin/logdump", help=
"logdump (a.k.a. teldump) executable to use")
97 config : dbIngestConfig
98 telem_threads : list[tuple[str, threading.Thread]]
99 telem_queue : queue.Queue
100 fs_observer : BaseObserverSubclassCallable
101 fs_queue : queue.Queue
102 last_update_ts_sec : float
103 startup_ts_sec : float
104 records_since_startup : float
107 user_log_threads : list[tuple[str, threading.Thread]]
108 user_log_queue : queue.Queue
111 telem_args = self.log.name +
'.' + dev,
'/opt/MagAOX/telem', (self.config.logdump_exe,
'--ext=.bintel'), dev, self.
telem_queuetelem_queue, Telem
112 telem_thread = threading.Thread(target=_run_logdump_thread, args=telem_args, daemon=
True)
114 self.log.debug(f
"Watching {dev} for incoming telem")
118 if dev ==
"observers":
119 ULog_args = self.log.name +
'.' + dev,
'/opt/MagAOX/logs', (self.config.logdump_exe,
'--ext=.binlog'), dev, self.
user_log_queueuser_log_queue, UserLog
120 user_log_thread = threading.Thread(target=_run_logdump_thread, args= ULog_args, daemon=
True)
121 user_log_thread.start()
122 self.log.debug(f
"Watching {dev} for incoming user logs")
126 self.properties[
'last_update'][
'timestamp'] = self.
last_update_ts_seclast_update_ts_sec
127 self.update_property(self.properties[
'last_update'])
130 self.update_property(self.properties[
'records'])
136 last_update = properties.NumberVector(name=
"last_update", perm=constants.PropertyPerm.READ_ONLY)
137 last_update.add_element(DefNumber(
140 min=0.0, max=1e200, format=
'%f',
143 self.add_property(last_update)
145 records = properties.NumberVector(name=
"records", perm=constants.PropertyPerm.READ_ONLY)
146 records.add_element(DefNumber(
149 min=0.0, max=1e200, format=
'%f',
152 records.add_element(DefNumber(
153 name=
"since_startup",
155 min=0, max=1_000_000_000, format=
'%i',
158 self.add_property(records)
160 self.
connconn = self.config.database.connect()
162 role = os.environ.get(
'MAGAOX_ROLE',
'')
163 proclist = pathlib.Path(self.config.proclist.replace(
'%s', role))
164 if not proclist.exists():
165 raise RuntimeError(f
"No process list at {proclist}")
169 with proclist.open()
as fh:
173 if line.strip()[0] ==
'#':
175 parts = line.split(
None, 1)
177 raise RuntimeError(f
"Got malformed proclist line: {repr(line)}")
178 device_names.add(parts[0])
186 for dev
in device_names:
197 for dirname
in self.config.data_dirs:
198 dirpath = self.config.common_path_prefix / dirname
199 if not dirpath.exists():
200 self.log.debug(f
"No {dirpath} to watch")
202 self.
fs_observerfs_observer.schedule(event_handler, dirpath, recursive=
True)
203 self.log.info(f
"Watching {dirpath} for changes")
207 search_paths = [self.config.common_path_prefix / name
for name
in self.config.data_dirs]
208 with self.
connconn.cursor()
as cur:
209 ingest.update_file_inventory(cur, self.config.hostname, search_paths)
210 self.log.info(f
"Completed startup rescan of file inventory for {self.config.hostname} from {search_paths}")
214 if self.log_file_name.encode(
'utf8')
not in line:
220 while rec := self.
telem_queuetelem_queue.get(timeout=0.1):
228 while rec := self.
fs_queuefs_queue.get(timeout=0.1):
229 fs_events.append(rec)
238 user_logs.append(rec)
243 with self.
connconn.transaction():
244 cur = self.
connconn.cursor()
245 ingest.batch_telem(cur, telems)
246 ingest.batch_file_origins(cur, fs_events)
247 ingest.batch_user_log(cur, user_logs)
249 this_ts_sec = time.time()
def __init__(self, host, events_queue, log_name)
def on_created(self, event)
def construct_message(self, stat_result, event, is_new_file=False)
def on_modified(self, event)
def launch_followers(self, dev)
def ingest_line(self, line)
def refresh_properties(self)
def _run_logdump_thread(logger_name, logdump_dir, logdump_args, name, message_queue, record_class)