API
 
Loading...
Searching...
No Matches
core.py
Go to the documentation of this file.
1import sys
2from typing import Optional, Union
3from random import choice
4import time
5from functools import partial
6from enum import Enum
7import logging
8import os
9import os.path
10import pathlib
11import pprint
12import re
13import xconf
14from xconf.contrib import DirectoryConfig
15from purepyindi2 import device, properties, constants, messages
16from purepyindi2.messages import DefNumber, DefSwitch, DefText
17from magaox.indi.device import XDevice, BaseConfig
18
19from .personality import Personality, Transition, Operation, SSML, Recording
20from .opentts_bridge import speak, ssml_to_wav
21
22log = logging.getLogger(__name__)
23HERE = os.path.dirname(__file__)
24TAGS_RE = re.compile('<.*?>')
25
26def drop_xml_tags(raw_xml):
27 return TAGS_RE.sub('', raw_xml)
28
29@xconf.config
30class AudibleAlertsConfig(BaseConfig):
31 random_utterance_interval_sec : Union[float, int] = xconf.field(default=15 * 60, help="Seconds since last (real or random) utterance before a random utterance should play")
32 cache : pathlib.Path = xconf.field(default=pathlib.Path("/tmp/audibleAlerts_cache"))
33
35 return '{' in text or '}' in text
36
37class AudibleAlerts(XDevice):
38 config : AudibleAlertsConfig
39 personality : Personality
40 _cb_handles : set
41 _speech_requests : list[Union[SSML, Recording]]
42 soundboard_sw_prop : Optional[properties.SwitchVector] = None # unset during setup until first personality is loaded
43 default_voice : str = "coqui-tts:en_ljspeech" # overridden by personality when loaded
44 personalities : list[str] = ['onsky', 'lab_mode',]
45 active_personality : str = "lab_mode"
46 api_url : str = "http://localhost:5500/"
47 mute : bool = True
48 latch_transitions : dict[Transition, constants.AnyIndiValue] # store last value when triggering a transition so subsequent messages don't trigger too
49 per_transition_cooldown_ts : dict[Transition, float]
50 last_utterance_ts : float = 0
51 last_utterance_chosen : Optional[str] = None
52 observers_device : str = 'observers'
53 last_walkup : Optional[dict[str,str]] = None
54 last_walkup_ts : float = 0
55 walkup_double_trigger_timeout_sec : float = 30
56
58 if any(sr == x for x in self._speech_requests_speech_requests):
59 log.warn(f"Duplicated speech request while already enqueued: {sr}")
60 return
62
63 def handle_speech_text(self, existing_property, new_message):
64 if 'target' in new_message and new_message['target'] != existing_property['current']:
65 self.log.debug(f"Setting new speech text: {new_message['target']}")
66 existing_property['current'] = new_message['target']
67 existing_property['target'] = new_message['target']
68 self.update_property(existing_property)
69
70 def handle_speech_request(self, existing_property, new_message):
71 self.log.debug(f"{new_message['request']=}")
72 if new_message['request'] is constants.SwitchState.ON:
73 current_text = self.properties['speech_text']['current']
74 if current_text is not None and len(current_text.strip()) != 0:
75 st = '<speak>' + current_text + '</speak>'
77 self.telem("speech_request", {"text": current_text})
78 self.update_property(existing_property) # ensure the request switch turns back off at the client
79
80 def handle_reload_request(self, existing_property, new_message):
81 if new_message['request'] is constants.SwitchState.ON:
82 self.telem("reload_personality", {"name": self.active_personalityactive_personality})
84 self.update_property(existing_property) # ensure the request switch turns back off at the client
85
86 def handle_mute_toggle(self, existing_property, new_message):
87 existing_property['toggle'] = new_message['toggle']
88 self.mutemute = new_message['toggle'] is constants.SwitchState.ON
89 if self.mutemute:
90 self.log.info("Muted")
91 else:
92 self.log.info("Unmuted")
93 self.update_property(existing_property)
94 self.telem("mute_toggle", {"mute": self.mutemute})
95
96 def walkup_handler(self, new_message):
97 which_updated = new_message.name
98 for element_name in new_message:
99 value = new_message[element_name]
100 if value == constants.SwitchState.ON:
101 if element_name == self.last_walkuplast_walkup[which_updated]:
102 self.log.debug(f"Already did {self.last_walkup[which_updated]}")
103 return
104 if element_name in self.personalitypersonality.walkups:
105 utterance = choice(self.personalitypersonality.walkups[element_name])
106 self.log.info(f"Queueing walk-up {utterance} for {element_name}")
108 self.last_walkuplast_walkup[which_updated] = element_name
109 self.enqueue_speech_request(utterance)
110
111 def reaction_handler(self, new_message, element_name, transition, utterance_choices):
112 if not isinstance(new_message, messages.IndiSetMessage):
113 return
114 if element_name not in new_message:
115 return
116 value = new_message[element_name]
117 if new_message.device == 'labrules':
118 self.log.debug(f"Judging reaction for {element_name} change to {repr(value)} using {transition}")
119 self.log.debug(f"before check {self.latch_transitions=}")
120 last_value = self.latch_transitionslatch_transitions.get(transition)
121 if new_message.device == 'labrules':
122 self.log.debug(f"{new_message}\n{transition.compare(value)=}, last value was {last_value}, {value != last_value=} {(not transition.compare(last_value))=}")
123 if transition.compare(value) and (
124 # if there's no operation, we fire on any change,
125 # but make sure it's actually a change
126 (transition.op is None and value != last_value) or
127 # don't fire if we already compared true on the last value:
128 (not transition.compare(last_value))
129 ):
130 self.latch_transitionslatch_transitions[transition] = value
131 if new_message.device == 'labrules':
132 self.log.debug(f"after update {self.latch_transitions=}")
133 self.log.debug(f"latched {transition=} with {value=}")
134 last_transition_ts = self.per_transition_cooldown_tsper_transition_cooldown_ts.get(transition, 0)
135 sec_since_trigger = time.time() - last_transition_ts
136 debounce_expired = sec_since_trigger > transition.debounce_sec
137 self.log.debug(f"Checking for debounce: {sec_since_trigger=} {debounce_expired=}")
138 if debounce_expired:
139 utterance = choice(utterance_choices)
140 self.log.debug(f"Submitting speech request: {utterance}")
141 self.enqueue_speech_request(utterance)
142 else:
143 self.log.debug(f"Would have spoken, but it's only been {sec_since_trigger=}")
144 elif transition.compare(last_value) and not transition.compare(value):
145 if new_message.device == 'labrules':
146 self.log.debug(f"un-latch {transition}, so next time we change to a value that compares True we trigger again. ({last_value=} {value=})")
147 self.log.debug(f"before {self.latch_transitions=}")
148 del self.latch_transitionslatch_transitions[transition]
149 if new_message.device == 'labrules':
150 self.log.debug(f"after {self.latch_transitions=}")
151 else:
152 if new_message.device == 'labrules':
153 self.log.debug(f"Got {new_message.device}.{new_message.name} but {transition=} did not match")
154
155 def preprocess(self, speech):
156 if isinstance(speech, Recording):
157 return speech
158 speech_text = speech.markup
159 substitutables = re.findall(r"({[^}]+})", speech.markup)
160 for sub in substitutables:
161 indi_id = sub[1:-1]
162 value = self.client[indi_id]
163 if hasattr(value, 'value'):
164 value = value.value
165 self.log.debug(f"Replacing {repr(sub)} with {value=}")
166 if value is not None:
167 try:
168 value = float(value)
169 value = "{:.1f}".format(value)
170 except (TypeError, ValueError):
171 value = str(value)
172 speech_text = speech_text.replace(sub, value)
173 return SSML(speech_text)
174
175 def handle_personality_switch(self, prop : properties.IndiProperty, new_message):
176 if not isinstance(new_message, messages.IndiNewMessage):
177 return
178 active_personality = None
179 for elem in prop:
180 prop[elem] = constants.SwitchState.OFF
181 if elem in new_message and new_message[elem] is constants.SwitchState.ON:
182 active_personality = elem
183
184 if active_personality is not None:
185 self.log.info(f"Switching to {active_personality=}")
186 self.load_personality(active_personality)
188 self.update_property(prop)
189
190 def handle_soundboard_switch(self, prop: properties.IndiProperty, new_message):
191 print(new_message)
192 if not isinstance(new_message, messages.IndiNewMessage):
193 return
194 for elem in new_message:
195 if new_message[elem] is constants.SwitchState.ON:
196 srq = self.personalitypersonality.soundboard[elem]
197 self.log.info(f"Soundboard requested {srq}")
198 self.enqueue_speech_request(srq)
199 # set everything off again
200 self.update_property(prop)
201
202 def load_personality(self, personality_name):
203 personality_file = os.path.join(HERE, "personalities", f"{personality_name}.xml")
204 for cb, device_name, property_name, _ in self._cb_handles_cb_handles:
205 try:
206 self.client.unregister_callback(cb, device_name=device_name, property_name=property_name)
207 except ValueError:
208 log.exception(f"Tried to remove {cb=} {device_name=} {property_name=}")
210 if self.soundboard_sw_propsoundboard_sw_prop is not None:
211 self.delete_property(self.soundboard_sw_propsoundboard_sw_prop)
212
213 self.log.info(f"Loading personality from {personality_file}")
215
217 name="soundboard",
220 )
221 for btn_name in self.personalitypersonality.soundboard:
223 self.add_property(self.soundboard_sw_propsoundboard_sw_prop, callback=self.handle_soundboard_switch)
224
226
227 for reaction in self.personalitypersonality.reactions:
228 device_name, property_name, element_name = reaction.indi_id.split('.')
230 for t in reaction.transitions:
231 cb = partial(self.reaction_handler, element_name=element_name, transition=t, utterance_choices=reaction.transitions[t])
232 self.client.register_callback(
233 cb,
234 device_name=device_name,
235 property_name=property_name
236 )
237 self._cb_handles_cb_handles.add((cb, device_name, property_name, t))
238 self.log.debug(f"Registered reaction handler on {device_name=} {property_name=} {element_name=} using transition {t}")
239 for idx, utterance in enumerate(reaction.transitions[t]):
240 self.log.debug(f"{reaction.indi_id}: {t}: {utterance}")
241 if isinstance(utterance, SSML):
243 result = ssml_to_wav(utterance.markup, self.default_voicedefault_voice, self.api_urlapi_url, self.config.cache)
244 self.log.debug(f"Caching synthesis to {result}")
245 else:
246 self.log.debug(f"Cannot pre-cache because there are substitutions to be made")
247 if self.walkup_handlerwalkup_handler not in self.client.callbacks[self.observers_deviceobservers_device]['operators']:
249 if self.walkup_handlerwalkup_handler not in self.client.callbacks[self.observers_deviceobservers_device]['observers']:
251 self.active_personalityactive_personality = personality_name
252 self.telem("load_personality", {'name': personality_name})
253 self.send_all_properties()
254
255 def setup(self):
259 self._cb_handles_cb_handles = set()
261 self.last_walkuplast_walkup = {'observers': '', 'operators': ''}
262
263 while self.client.status is not constants.ConnectionStatus.CONNECTED:
264 self.log.info("Waiting for connection...")
265 time.sleep(1)
266 self.log.info("Connected.")
267 self.log.debug(f"Caching synthesis output to {self.config.cache}")
268 self.config.cache.mkdir(exist_ok=True)
270
272 name="mute",
275 )
276 sv.add_element(DefSwitch(name=f"toggle", _value=constants.SwitchState.ON if self.mutemute else constants.SwitchState.OFF))
277 self.add_property(sv, callback=self.handle_mute_toggle)
278
280 name="personality",
283 )
284 for pers in self.personalities:
285 print(f"{pers=}")
287 self.add_property(sv, callback=self.handle_personality_switch)
288
289 speech_text = properties.TextVector(name="speech_text", perm=constants.PropertyPerm.READ_WRITE)
291 name="current",
292 _value=None,
293 ))
295 name="target",
296 _value=None,
297 ))
298 self.add_property(speech_text, callback=self.handle_speech_text)
299
300 speech_request = properties.SwitchVector(
301 name="speak",
303 )
304 speech_request.add_element(DefSwitch(name="request", _value=constants.SwitchState.OFF))
305 self.add_property(speech_request, callback=self.handle_speech_request)
306
307 reload_request = properties.SwitchVector(
308 name="reload_personality",
310 )
311 reload_request.add_element(DefSwitch(name="request", _value=constants.SwitchState.OFF))
312 self.add_property(reload_request, callback=self.handle_reload_request)
313
314 self.log.info("Set up complete")
315
316 def loop(self):
318 speech = self.preprocess(self._speech_requests_speech_requests.pop(0))
319 if self.mutemute:
320 self.log.debug(f"Would have said: {repr(speech)}, but muted")
321 else:
322 self.log.info(f"Speaking: {repr(speech)}")
323 speak(speech, self.default_voicedefault_voice, self.api_urlapi_url, self.config.cache)
324 self.log.debug("Speech complete")
325 self.last_utterance_tslast_utterance_ts = time.time() # update timestamp to prevent random utterances
326 if time.time() - self.last_utterance_tslast_utterance_ts > self.config.random_utterance_interval_sec and len(self.personalitypersonality.random_utterances):
327 next_utterance = choice(self.personalitypersonality.random_utterances)
328 while next_utterance == self.last_utterance_chosenlast_utterance_chosen:
329 next_utterance = choice(self.personalitypersonality.random_utterances)
332 if self.mutemute:
333 self.log.debug(f"Would have said: {repr(next_utterance)}, but muted")
334 else:
335 self.log.info(f"Randomly spouting off: {repr(next_utterance)}")
336 speak(next_utterance, self.default_voicedefault_voice, self.api_urlapi_url, self.config.cache)
337
338# Used to make the pyproject.toml just a little simpler,
339# with fewer repetitions of the app name:
load_personality(self, personality_name)
Definition core.py:202
handle_soundboard_switch(self, properties.IndiProperty prop, new_message)
Definition core.py:190
reaction_handler(self, new_message, element_name, transition, utterance_choices)
Definition core.py:111
handle_speech_request(self, existing_property, new_message)
Definition core.py:70
handle_mute_toggle(self, existing_property, new_message)
Definition core.py:86
AudibleAlertsConfig config
Definition core.py:38
handle_personality_switch(self, properties.IndiProperty prop, new_message)
Definition core.py:175
handle_speech_text(self, existing_property, new_message)
Definition core.py:63
handle_reload_request(self, existing_property, new_message)
Definition core.py:80
walkup_handler(self, new_message)
Definition core.py:96
drop_xml_tags(raw_xml)
Definition core.py:26
contains_substitutions(text)
Definition core.py:34