API
 
Loading...
Searching...
No Matches
core.py
Go to the documentation of this file.
1import sys
2from typing import Optional, Union
3from random import choice
4import time
5from functools import partial
6from enum import Enum
7import logging
8import os
9import os.path
10import pprint
11import re
12import xconf
13from xconf.contrib import DirectoryConfig
14from purepyindi2 import device, properties, constants, messages
15from purepyindi2.messages import DefNumber, DefSwitch, DefText
16from magaox.indi.device import XDevice, BaseConfig
17
18from .personality import Personality, Transition, Operation, SSML, Recording
19from .opentts_bridge import speak, ssml_to_wav
20
21log = logging.getLogger(__name__)
22HERE = os.path.dirname(__file__)
23TAGS_RE = re.compile('<.*?>')
24
25def drop_xml_tags(raw_xml):
26 return TAGS_RE.sub('', raw_xml)
27
28@xconf.config
29class AudibleAlertsConfig(BaseConfig):
30 random_utterance_interval_sec : Union[float, int] = xconf.field(default=15 * 60, help="Seconds since last (real or random) utterance before a random utterance should play")
31 cache : DirectoryConfig = xconf.field(default=DirectoryConfig(path="/tmp/audibleAlerts_cache"))
32
34 return '{' in text or '}' in text
35
36class AudibleAlerts(XDevice):
37 config : AudibleAlertsConfig
38 personality : Personality
39 _cb_handles : set
40 _speech_requests : list[Union[SSML, Recording]]
41 soundboard_sw_prop : properties.SwitchVector = None
42 default_voice : str = "coqui-tts:en_ljspeech" # overridden by personality when loaded
43 personalities : list[str] = ['default', 'lab_mode',]
44 active_personality : str = "default"
45 api_url : str = "http://localhost:5500/"
46 mute : bool = False
47 latch_transitions : dict[Transition, constants.AnyIndiValue] # store last value when triggering a transition so subsequent messages don't trigger too
48 per_transition_cooldown_ts : dict[Transition, float]
49 last_utterance_ts : float = 0
50 last_utterance_chosen : Optional[str] = None
51 observers_device : str = 'observers'
52 last_walkup : Optional[dict[str,str]] = None
53 last_walkup_ts : float = 0
54 walkup_double_trigger_timeout_sec : float = 30
55
57 if any(sr == x for x in self._speech_requests_speech_requests):
58 log.warn(f"Duplicated speech request while already enqueued: {sr}")
59 return
61
62 def handle_speech_text(self, existing_property, new_message):
63 if 'target' in new_message and new_message['target'] != existing_property['current']:
64 self.log.debug(f"Setting new speech text: {new_message['target']}")
65 existing_property['current'] = new_message['target']
66 existing_property['target'] = new_message['target']
67 self.update_property(existing_property)
68
69 def handle_speech_request(self, existing_property, new_message):
70 self.log.debug(f"{new_message['request']=}")
71 if new_message['request'] is constants.SwitchState.ON:
72 current_text = self.properties['speech_text']['current']
73 if current_text is not None and len(current_text.strip()) != 0:
74 st = '<speak>' + current_text + '</speak>'
76 self.telem("speech_request", {"text": current_text})
77 self.update_property(existing_property) # ensure the request switch turns back off at the client
78
79 def handle_reload_request(self, existing_property, new_message):
80 if new_message['request'] is constants.SwitchState.ON:
81 self.telem("reload_personality", {"name": self.active_personalityactive_personality})
83 self.update_property(existing_property) # ensure the request switch turns back off at the client
84
85 def handle_mute_toggle(self, existing_property, new_message):
86 existing_property['toggle'] = new_message['toggle']
87 self.mutemute = new_message['toggle'] is constants.SwitchState.ON
88 if self.mutemute:
89 self.log.info("Muted")
90 else:
91 self.log.info("Unmuted")
92 self.update_property(existing_property)
93 self.telem("mute_toggle", {"mute": self.mutemute})
94
95 def walkup_handler(self, new_message):
96 which_updated = new_message.name
97 for element_name in new_message:
98 value = new_message[element_name]
99 if value == constants.SwitchState.ON:
100 if element_name == self.last_walkuplast_walkup[which_updated]:
101 self.log.debug(f"Already did {self.last_walkup[which_updated]}")
102 return
103 if element_name in self.personalitypersonality.walkups:
104 utterance = choice(self.personalitypersonality.walkups[element_name])
105 self.log.info(f"Queueing walk-up {utterance} for {element_name}")
107 self.last_walkuplast_walkup[which_updated] = element_name
108 self.enqueue_speech_request(utterance)
109
110 def reaction_handler(self, new_message, element_name, transition, utterance_choices):
111 if not isinstance(new_message, messages.IndiSetMessage):
112 return
113 if element_name not in new_message:
114 return
115 value = new_message[element_name]
116 if new_message.device == 'labrules':
117 self.log.debug(f"Judging reaction for {element_name} change to {repr(value)} using {transition}")
118 self.log.debug(f"before check {self.latch_transitions=}")
119 last_value = self.latch_transitionslatch_transitions.get(transition)
120 if new_message.device == 'labrules':
121 self.log.debug(f"{new_message}\n{transition.compare(value)=}, last value was {last_value}, {value != last_value=} {(not transition.compare(last_value))=}")
122 if transition.compare(value) and (
123 # if there's no operation, we fire on any change,
124 # but make sure it's actually a change
125 (transition.op is None and value != last_value) or
126 # don't fire if we already compared true on the last value:
127 (not transition.compare(last_value))
128 ):
129 self.latch_transitionslatch_transitions[transition] = value
130 if new_message.device == 'labrules':
131 self.log.debug(f"after update {self.latch_transitions=}")
132 self.log.debug(f"latched {transition=} with {value=}")
133 last_transition_ts = self.per_transition_cooldown_tsper_transition_cooldown_ts.get(transition, 0)
134 sec_since_trigger = time.time() - last_transition_ts
135 debounce_expired = sec_since_trigger > transition.debounce_sec
136 self.log.debug(f"Checking for debounce: {sec_since_trigger=} {debounce_expired=}")
137 if debounce_expired:
138 utterance = choice(utterance_choices)
139 self.log.debug(f"Submitting speech request: {utterance}")
140 self.enqueue_speech_request(utterance)
141 else:
142 self.log.debug(f"Would have spoken, but it's only been {sec_since_trigger=}")
143 elif transition.compare(last_value) and not transition.compare(value):
144 if new_message.device == 'labrules':
145 self.log.debug(f"un-latch {transition}, so next time we change to a value that compares True we trigger again. ({last_value=} {value=})")
146 self.log.debug(f"before {self.latch_transitions=}")
147 del self.latch_transitionslatch_transitions[transition]
148 if new_message.device == 'labrules':
149 self.log.debug(f"after {self.latch_transitions=}")
150 else:
151 if new_message.device == 'labrules':
152 self.log.debug(f"Got {new_message.device}.{new_message.name} but {transition=} did not match")
153
154 def preprocess(self, speech):
155 if isinstance(speech, Recording):
156 return speech
157 speech_text = speech.markup
158 substitutables = re.findall(r"({[^}]+})", speech.markup)
159 for sub in substitutables:
160 indi_id = sub[1:-1]
161 value = self.client[indi_id]
162 if hasattr(value, 'value'):
163 value = value.value
164 self.log.debug(f"Replacing {repr(sub)} with {value=}")
165 if value is not None:
166 try:
167 value = float(value)
168 value = "{:.1f}".format(value)
169 except (TypeError, ValueError):
170 value = str(value)
171 speech_text = speech_text.replace(sub, value)
172 return SSML(speech_text)
173
174 def handle_personality_switch(self, prop : properties.IndiProperty, new_message):
175 if not isinstance(new_message, messages.IndiNewMessage):
176 return
177 active_personality = None
178 for elem in prop:
179 prop[elem] = constants.SwitchState.OFF
180 if elem in new_message and new_message[elem] is constants.SwitchState.ON:
181 active_personality = elem
182
183 if active_personality is not None:
184 self.log.info(f"Switching to {active_personality=}")
185 self.load_personality(active_personality)
187 self.update_property(prop)
188
189 def handle_soundboard_switch(self, prop: properties.IndiProperty, new_message):
190 print(new_message)
191 if not isinstance(new_message, messages.IndiNewMessage):
192 return
193 for elem in new_message:
194 if new_message[elem] is constants.SwitchState.ON:
195 srq = self.personalitypersonality.soundboard[elem]
196 self.log.info(f"Soundboard requested {srq}")
197 self.enqueue_speech_request(srq)
198 # set everything off again
199 self.update_property(prop)
200
201 def load_personality(self, personality_name):
202 personality_file = os.path.join(HERE, "personalities", f"{personality_name}.xml")
203 for cb, device_name, property_name, _ in self._cb_handles_cb_handles:
204 try:
205 self.client.unregister_callback(cb, device_name=device_name, property_name=property_name)
206 except ValueError:
207 log.exception(f"Tried to remove {cb=} {device_name=} {property_name=}")
209 if self.soundboard_sw_propsoundboard_sw_prop is not None:
210 self.delete_property(self.soundboard_sw_propsoundboard_sw_prop)
211
212 self.log.info(f"Loading personality from {personality_file}")
214
216 name="soundboard",
219 )
220 for btn_name in self.personalitypersonality.soundboard:
222 self.add_property(self.soundboard_sw_propsoundboard_sw_prop, callback=self.handle_soundboard_switch)
223
225
226 for reaction in self.personalitypersonality.reactions:
227 device_name, property_name, element_name = reaction.indi_id.split('.')
229 for t in reaction.transitions:
230 cb = partial(self.reaction_handler, element_name=element_name, transition=t, utterance_choices=reaction.transitions[t])
231 self.client.register_callback(
232 cb,
233 device_name=device_name,
234 property_name=property_name
235 )
236 self._cb_handles_cb_handles.add((cb, device_name, property_name, t))
237 self.log.debug(f"Registered reaction handler on {device_name=} {property_name=} {element_name=} using transition {t}")
238 for idx, utterance in enumerate(reaction.transitions[t]):
239 self.log.debug(f"{reaction.indi_id}: {t}: {utterance}")
240 if isinstance(utterance, SSML):
242 result = ssml_to_wav(utterance.markup, self.default_voicedefault_voice, self.api_urlapi_url, self.config.cache.path)
243 self.log.debug(f"Caching synthesis to {result}")
244 else:
245 self.log.debug(f"Cannot pre-cache because there are substitutions to be made")
246 if self.walkup_handlerwalkup_handler not in self.client.callbacks[self.observers_deviceobservers_device]['operators']:
248 if self.walkup_handlerwalkup_handler not in self.client.callbacks[self.observers_deviceobservers_device]['observers']:
250 self.active_personalityactive_personality = personality_name
251 self.telem("load_personality", {'name': personality_name})
252 self.send_all_properties()
253
254 def setup(self):
258 self._cb_handles_cb_handles = set()
260 self.last_walkuplast_walkup = {'observers': '', 'operators': ''}
261
262 while self.client.status is not constants.ConnectionStatus.CONNECTED:
263 self.log.info("Waiting for connection...")
264 time.sleep(1)
265 self.log.info("Connected.")
266 self.log.debug(f"Caching synthesis output to {self.config.cache.path}")
269
271 name="mute",
274 )
275 sv.add_element(DefSwitch(name=f"toggle", _value=constants.SwitchState.ON if self.mutemute else constants.SwitchState.OFF))
276 self.add_property(sv, callback=self.handle_mute_toggle)
277
279 name="personality",
282 )
283 for pers in self.personalities:
284 print(f"{pers=}")
286 self.add_property(sv, callback=self.handle_personality_switch)
287
288 speech_text = properties.TextVector(name="speech_text", perm=constants.PropertyPerm.READ_WRITE)
290 name="current",
291 _value=None,
292 ))
294 name="target",
295 _value=None,
296 ))
297 self.add_property(speech_text, callback=self.handle_speech_text)
298
299 speech_request = properties.SwitchVector(
300 name="speak",
302 )
303 speech_request.add_element(DefSwitch(name="request", _value=constants.SwitchState.OFF))
304 self.add_property(speech_request, callback=self.handle_speech_request)
305
306 reload_request = properties.SwitchVector(
307 name="reload_personality",
309 )
310 reload_request.add_element(DefSwitch(name="request", _value=constants.SwitchState.OFF))
311 self.add_property(reload_request, callback=self.handle_reload_request)
312
313 self.log.info("Set up complete")
314
315 def loop(self):
317 speech = self.preprocess(self._speech_requests_speech_requests.pop(0))
318 if self.mutemute:
319 self.log.debug(f"Would have said: {repr(speech)}, but muted")
320 else:
321 self.log.info(f"Speaking: {repr(speech)}")
323 self.log.debug("Speech complete")
324 self.last_utterance_tslast_utterance_ts = time.time() # update timestamp to prevent random utterances
325 if time.time() - self.last_utterance_tslast_utterance_ts > self.config.random_utterance_interval_sec and len(self.personalitypersonality.random_utterances):
326 next_utterance = choice(self.personalitypersonality.random_utterances)
327 while next_utterance == self.last_utterance_chosenlast_utterance_chosen:
328 next_utterance = choice(self.personalitypersonality.random_utterances)
331 if self.mutemute:
332 self.log.debug(f"Would have said: {repr(next_utterance)}, but muted")
333 else:
334 self.log.info(f"Randomly spouting off: {repr(next_utterance)}")
335 speak(next_utterance, self.default_voicedefault_voice, self.api_urlapi_url, self.config.cache.path)
load_personality(self, personality_name)
Definition core.py:201
handle_soundboard_switch(self, properties.IndiProperty prop, new_message)
Definition core.py:189
reaction_handler(self, new_message, element_name, transition, utterance_choices)
Definition core.py:110
handle_speech_request(self, existing_property, new_message)
Definition core.py:69
handle_mute_toggle(self, existing_property, new_message)
Definition core.py:85
AudibleAlertsConfig config
Definition core.py:37
handle_personality_switch(self, properties.IndiProperty prop, new_message)
Definition core.py:174
handle_speech_text(self, existing_property, new_message)
Definition core.py:62
handle_reload_request(self, existing_property, new_message)
Definition core.py:79
walkup_handler(self, new_message)
Definition core.py:95
drop_xml_tags(raw_xml)
Definition core.py:25
contains_substitutions(text)
Definition core.py:33