API
 
Loading...
Searching...
No Matches
core.py
Go to the documentation of this file.
1import sys
2from typing import Optional, Union
3from random import choice
4import time
5from functools import partial
6from enum import Enum
7import logging
8import os
9import os.path
10import pprint
11import re
12import xconf
13from purepyindi2 import device, properties, constants, messages
14from purepyindi2.messages import DefNumber, DefSwitch, DefText
15from magaox.indi.device import XDevice, BaseConfig
16
17from .personality import Personality, Transition, Operation, SSML, Recording
18from .opentts_bridge import speak, ssml_to_wav
19
20log = logging.getLogger(__name__)
21HERE = os.path.dirname(__file__)
22TAGS_RE = re.compile('<.*?>')
23
24def drop_xml_tags(raw_xml):
25 return TAGS_RE.sub('', raw_xml)
26
27@xconf.config
28class AudibleAlertsConfig(BaseConfig):
29 random_utterance_interval_sec : Union[float, int] = xconf.field(default=15 * 60, help="Seconds since last (real or random) utterance before a random utterance should play")
30 cache : xconf.DirectoryConfig = xconf.field(default=xconf.DirectoryConfig(path="/tmp/audibleAlerts_cache"))
31
33 return '{' in text or '}' in text
34
35class AudibleAlerts(XDevice):
36 config : AudibleAlertsConfig
37 personality : Personality
38 _cb_handles : set
39 _speech_requests : list[Union[SSML, Recording]]
40 soundboard_sw_prop : properties.SwitchVector = None
41 default_voice : str = "coqui-tts:en_ljspeech" # overridden by personality when loaded
42 personalities : list[str] = ['default', 'lab_mode',]
43 active_personality : str = "default"
44 api_url : str = "http://localhost:5500/"
45 mute : bool = False
46 latch_transitions : dict[Transition, constants.AnyIndiValue] # store last value when triggering a transition so subsequent messages don't trigger too
47 per_transition_cooldown_ts : dict[Transition, float]
48 last_utterance_ts : float = 0
49 last_utterance_chosen : Optional[str] = None
50
52 if any(sr == x for x in self._speech_requests_speech_requests):
53 log.warn(f"Duplicated speech request while already enqueued: {sr}")
54 return
56
57 def handle_speech_text(self, existing_property, new_message):
58 if 'target' in new_message and new_message['target'] != existing_property['current']:
59 self.log.debug(f"Setting new speech text: {new_message['target']}")
60 existing_property['current'] = new_message['target']
61 existing_property['target'] = new_message['target']
62 self.update_property(existing_property)
63
64 def handle_speech_request(self, existing_property, new_message):
65 self.log.debug(f"{new_message['request']=}")
66 if new_message['request'] is constants.SwitchState.ON:
67 current_text = self.properties['speech_text']['current']
68 if current_text is not None and len(current_text.strip()) != 0:
69 st = '<speak>' + current_text + '</speak>'
71 self.telem("speech_request", {"text": current_text})
72 self.update_property(existing_property) # ensure the request switch turns back off at the client
73
74 def handle_reload_request(self, existing_property, new_message):
75 if new_message['request'] is constants.SwitchState.ON:
76 self.telem("reload_personality", {"name": self.active_personalityactive_personality})
78 self.update_property(existing_property) # ensure the request switch turns back off at the client
79
80 def handle_mute_toggle(self, existing_property, new_message):
81 existing_property['toggle'] = new_message['toggle']
82 self.mutemute = new_message['toggle'] is constants.SwitchState.ON
83 if self.mutemute:
84 log.debug("Muted")
85 else:
86 log.debug("Unmuted")
87 self.update_property(existing_property)
88 self.telem("mute_toggle", {"mute": self.mutemute})
89
90 def reaction_handler(self, new_message, element_name, transition, utterance_choices):
91 if not isinstance(new_message, messages.IndiSetMessage):
92 return
93 if element_name not in new_message:
94 return
95 value = new_message[element_name]
96 self.log.debug(f"Judging reaction for {element_name} change to {repr(value)} using {transition}")
97 self.log.debug(f"before check {self.latch_transitions=}")
98 last_value = self.latch_transitionslatch_transitions.get(transition)
99 self.log.debug(f"{new_message}\n{transition.compare(value)=}, last value was {last_value}, {value != last_value=} {(not transition.compare(last_value))=}")
100 if transition.compare(value) and (
101 # if there's no operation, we fire on any change,
102 # but make sure it's actually a change
103 (transition.op is None and value != last_value) or
104 # don't fire if we already compared true on the last value:
105 (not transition.compare(last_value))
106 ):
107 self.latch_transitionslatch_transitions[transition] = value
108 self.log.debug(f"after update {self.latch_transitions=}")
109 self.log.debug(f"latched {transition=} with {value=}")
110 last_transition_ts = self.per_transition_cooldown_tsper_transition_cooldown_ts.get(transition, 0)
111 sec_since_trigger = time.time() - last_transition_ts
112 debounce_expired = sec_since_trigger > transition.debounce_sec
113 self.log.debug(f"Checking for debounce: {sec_since_trigger=} {debounce_expired=}")
114 if debounce_expired:
115 utterance = choice(utterance_choices)
116 self.log.debug(f"Submitting speech request: {utterance}")
117 self.enqueue_speech_request(utterance)
118 else:
119 self.log.debug(f"Would have spoken, but it's only been {sec_since_trigger=}")
120 elif transition.compare(last_value) and not transition.compare(value):
121 self.log.debug(f"un-latch {transition}, so next time we change to a value that compares True we trigger again. ({last_value=} {value=})")
122 self.log.debug(f"before {self.latch_transitions=}")
123 del self.latch_transitionslatch_transitions[transition]
124 self.log.debug(f"after {self.latch_transitions=}")
125 else:
126 self.log.debug(f"Got {new_message.device}.{new_message.name} but {transition=} did not match")
127
128 def preprocess(self, speech):
129 if isinstance(speech, Recording):
130 return speech
131 speech_text = speech.markup
132 substitutables = re.findall(r"({[^}]+})", speech.markup)
133 for sub in substitutables:
134 indi_id = sub[1:-1]
135 value = self.client[indi_id]
136 if hasattr(value, 'value'):
137 value = value.value
138 self.log.debug(f"Replacing {repr(sub)} with {value=}")
139 if value is not None:
140 try:
141 value = float(value)
142 value = "{:.1f}".format(value)
143 except (TypeError, ValueError):
144 value = str(value)
145 speech_text = speech_text.replace(sub, value)
146 return SSML(speech_text)
147
148 def handle_personality_switch(self, prop : properties.IndiProperty, new_message):
149 if not isinstance(new_message, messages.IndiNewMessage):
150 return
151 active_personality = None
152 for elem in prop:
153 prop[elem] = constants.SwitchState.OFF
154 if elem in new_message and new_message[elem] is constants.SwitchState.ON:
155 active_personality = elem
156
157 if active_personality is not None:
158 self.log.info(f"Switching to {active_personality=}")
159 self.load_personality(active_personality)
161 self.update_property(prop)
162
163 def handle_soundboard_switch(self, prop: properties.IndiProperty, new_message):
164 print(new_message)
165 if not isinstance(new_message, messages.IndiNewMessage):
166 return
167 for elem in new_message:
168 if new_message[elem] is constants.SwitchState.ON:
169 srq = self.personalitypersonality.soundboard[elem]
170 self.log.info(f"Soundboard requested {srq}")
171 self.enqueue_speech_request(srq)
172 # set everything off again
173 self.update_property(prop)
174
175 def load_personality(self, personality_name):
176 personality_file = os.path.join(HERE, "personalities", f"{personality_name}.xml")
177 for cb, device_name, property_name, _ in self._cb_handles_cb_handles:
178 try:
179 self.client.unregister_callback(cb, device_name=device_name, property_name=property_name)
180 except ValueError:
181 log.exception(f"Tried to remove {cb=} {device_name=} {property_name=}")
183 if self.soundboard_sw_propsoundboard_sw_prop is not None:
184 self.delete_property(self.soundboard_sw_propsoundboard_sw_prop)
185
186 self.log.info(f"Loading personality from {personality_file}")
188
190 name="soundboard",
193 )
194 for btn_name in self.personalitypersonality.soundboard:
196 self.add_property(self.soundboard_sw_propsoundboard_sw_prop, callback=self.handle_soundboard_switch)
197
199
200 for reaction in self.personalitypersonality.reactions:
201 device_name, property_name, element_name = reaction.indi_id.split('.')
203 for t in reaction.transitions:
204 cb = partial(self.reaction_handler, element_name=element_name, transition=t, utterance_choices=reaction.transitions[t])
205 self.client.register_callback(
206 cb,
207 device_name=device_name,
208 property_name=property_name
209 )
210 self._cb_handles_cb_handles.add((cb, device_name, property_name, t))
211 self.log.debug(f"Registered reaction handler on {device_name=} {property_name=} {element_name=} using transition {t}")
212 for idx, utterance in enumerate(reaction.transitions[t]):
213 self.log.debug(f"{reaction.indi_id}: {t}: {utterance}")
214 if isinstance(utterance, SSML):
216 result = ssml_to_wav(utterance.markup, self.default_voicedefault_voice, self.api_urlapi_url, self.config.cache.path)
217 self.log.debug(f"Caching synthesis to {result}")
218 else:
219 self.log.debug(f"Cannot pre-cache because there are substitutions to be made")
220 self.active_personalityactive_personality = personality_name
221 self.telem("load_personality", {'name': personality_name})
222 self.send_all_properties()
223
224 def setup(self):
228 self._cb_handles_cb_handles = set()
230
231 while self.client.status is not constants.ConnectionStatus.CONNECTED:
232 self.log.info("Waiting for connection...")
233 time.sleep(1)
234 self.log.info("Connected.")
235 self.log.debug(f"Caching synthesis output to {self.config.cache.path}")
238
240 name="mute",
243 )
244 sv.add_element(DefSwitch(name=f"toggle", _value=constants.SwitchState.ON if self.mutemute else constants.SwitchState.OFF))
245 self.add_property(sv, callback=self.handle_mute_toggle)
246
248 name="personality",
251 )
252 for pers in self.personalities:
253 print(f"{pers=}")
255 self.add_property(sv, callback=self.handle_personality_switch)
256
257 speech_text = properties.TextVector(name="speech_text", perm=constants.PropertyPerm.READ_WRITE)
259 name="current",
260 _value=None,
261 ))
263 name="target",
264 _value=None,
265 ))
266 self.add_property(speech_text, callback=self.handle_speech_text)
267
268 speech_request = properties.SwitchVector(
269 name="speak",
271 )
272 speech_request.add_element(DefSwitch(name="request", _value=constants.SwitchState.OFF))
273 self.add_property(speech_request, callback=self.handle_speech_request)
274
275 reload_request = properties.SwitchVector(
276 name="reload_personality",
278 )
279 reload_request.add_element(DefSwitch(name="request", _value=constants.SwitchState.OFF))
280 self.add_property(reload_request, callback=self.handle_reload_request)
281
282 self.log.info("Set up complete")
283
284 def loop(self):
286 speech = self.preprocess(self._speech_requests_speech_requests.pop(0))
287 if self.mutemute:
288 self.log.debug(f"Would have said: {repr(speech)}, but muted")
289 else:
290 self.log.info(f"Speaking: {repr(speech)}")
292 self.log.debug("Speech complete")
293 self.last_utterance_tslast_utterance_ts = time.time() # update timestamp to prevent random utterances
294 if time.time() - self.last_utterance_tslast_utterance_ts > self.config.random_utterance_interval_sec and len(self.personalitypersonality.random_utterances):
295 next_utterance = choice(self.personalitypersonality.random_utterances)
296 while next_utterance == self.last_utterance_chosenlast_utterance_chosen:
297 next_utterance = choice(self.personalitypersonality.random_utterances)
300 if self.mutemute:
301 self.log.debug(f"Would have said: {repr(next_utterance)}, but muted")
302 else:
303 self.log.info(f"Randomly spouting off: {repr(next_utterance)}")
304 speak(next_utterance, self.default_voicedefault_voice, self.api_urlapi_url, self.config.cache.path)
load_personality(self, personality_name)
Definition core.py:175
handle_soundboard_switch(self, properties.IndiProperty prop, new_message)
Definition core.py:163
reaction_handler(self, new_message, element_name, transition, utterance_choices)
Definition core.py:90
handle_speech_request(self, existing_property, new_message)
Definition core.py:64
handle_mute_toggle(self, existing_property, new_message)
Definition core.py:80
AudibleAlertsConfig config
Definition core.py:36
handle_personality_switch(self, properties.IndiProperty prop, new_message)
Definition core.py:148
handle_speech_text(self, existing_property, new_message)
Definition core.py:57
handle_reload_request(self, existing_property, new_message)
Definition core.py:74
drop_xml_tags(raw_xml)
Definition core.py:24
contains_substitutions(text)
Definition core.py:32