API
core.py
Go to the documentation of this file.
1 import sys
2 from typing import Optional, Union
3 from random import choice
4 import time
5 from functools import partial
6 from enum import Enum
7 import logging
8 import os
9 import os.path
10 import pprint
11 import re
12 import xconf
13 from purepyindi2 import device, properties, constants, messages
14 from purepyindi2.messages import DefNumber, DefSwitch, DefText
15 from magaox.indi.device import XDevice, BaseConfig
16 
17 from .personality import Personality, Transition, Operation, SSML, Recording
18 from .opentts_bridge import speak, ssml_to_wav
19 
20 log = logging.getLogger(__name__)
21 HERE = os.path.dirname(__file__)
22 TAGS_RE = re.compile('<.*?>')
23 
24 def drop_xml_tags(raw_xml):
25  return TAGS_RE.sub('', raw_xml)
26 
27 @xconf.config
28 class AudibleAlertsConfig(BaseConfig):
29  random_utterance_interval_sec : Union[float, int] = xconf.field(default=15 * 60, help="Seconds since last (real or random) utterance before a random utterance should play")
30  cache : xconf.DirectoryConfig = xconf.field(default=xconf.DirectoryConfig(path="/tmp/audibleAlerts_cache"))
31 
33  return '{' in text or '}' in text
34 
35 class AudibleAlerts(XDevice):
36  config : AudibleAlertsConfig
37  personality : Personality
38  _cb_handles : set
39  _speech_requests : list[Union[SSML, Recording]]
40  soundboard_sw_prop : properties.SwitchVector = None
41  default_voice : str = "coqui-tts:en_ljspeech" # overridden by personality when loaded
42  personalities : list[str] = ['default', 'lab_mode',]
43  active_personality : str = "default"
44  api_url : str = "http://localhost:5500/"
45  mute : bool = False
46  latch_transitions : dict[Transition, constants.AnyIndiValue] # store last value when triggering a transition so subsequent messages don't trigger too
47  per_transition_cooldown_ts : dict[Transition, float]
48  last_utterance_ts : float = 0
49  last_utterance_chosen : Optional[str] = None
50 
51  def enqueue_speech_request(self, sr):
52  if any(sr == x for x in self._speech_requests_speech_requests):
53  log.warn(f"Duplicated speech request while already enqueued: {sr}")
54  return
55  self._speech_requests_speech_requests.append(sr)
56 
57  def handle_speech_text(self, existing_property, new_message):
58  if 'target' in new_message and new_message['target'] != existing_property['current']:
59  self.log.debug(f"Setting new speech text: {new_message['target']}")
60  existing_property['current'] = new_message['target']
61  existing_property['target'] = new_message['target']
62  self.update_property(existing_property)
63 
64  def handle_speech_request(self, existing_property, new_message):
65  self.log.debug(f"{new_message['request']=}")
66  if new_message['request'] is constants.SwitchState.ON:
67  current_text = self.properties['speech_text']['current']
68  if current_text is not None and len(current_text.strip()) != 0:
69  st = '<speak>' + current_text + '</speak>'
70  self.enqueue_speech_requestenqueue_speech_request(SSML(st))
71  self.telem("speech_request", {"text": current_text})
72  self.update_property(existing_property) # ensure the request switch turns back off at the client
73 
74  def handle_reload_request(self, existing_property, new_message):
75  if new_message['request'] is constants.SwitchState.ON:
76  self.telem("reload_personality", {"name": self.active_personalityactive_personality})
77  self.load_personalityload_personality(self.active_personalityactive_personality)
78  self.update_property(existing_property) # ensure the request switch turns back off at the client
79 
80  def handle_mute_toggle(self, existing_property, new_message):
81  existing_property['toggle'] = new_message['toggle']
82  self.mutemute = new_message['toggle'] is constants.SwitchState.ON
83  if self.mutemute:
84  log.debug("Muted")
85  else:
86  log.debug("Unmuted")
87  self.update_property(existing_property)
88  self.telem("mute_toggle", {"mute": self.mutemute})
89 
90  def reaction_handler(self, new_message, element_name, transition, utterance_choices):
91  if not isinstance(new_message, messages.IndiSetMessage):
92  return
93  if element_name not in new_message:
94  return
95  value = new_message[element_name]
96  self.log.debug(f"Judging reaction for {element_name} change to {repr(value)} using {transition}")
97  last_value = self.latch_transitionslatch_transitions.get(transition)
98  self.log.debug(f"{new_message}\n{transition.compare(value)=}, last value was {last_value}, {value != last_value=} {(not transition.compare(last_value))=}")
99  if transition.compare(value) and (
100  # if there's no operation, we fire on any change,
101  # but make sure it's actually a change
102  (transition.op is None and value != last_value) or
103  # don't fire if we already compared true on the last value:
104  (not transition.compare(last_value))
105  ):
106  self.latch_transitionslatch_transitions[transition] = value
107  last_transition_ts = self.per_transition_cooldown_tsper_transition_cooldown_ts.get(transition, 0)
108  sec_since_trigger = time.time() - last_transition_ts
109  debounce_expired = sec_since_trigger > transition.debounce_sec
110  self.log.debug(f"Debounced {sec_since_trigger=}")
111  if debounce_expired:
112  utterance = choice(utterance_choices)
113  self.log.debug(f"Submitting speech request: {utterance}")
114  self.enqueue_speech_requestenqueue_speech_request(utterance)
115  else:
116  self.log.debug(f"Would have talked, but it's only been {sec_since_trigger=}")
117  elif transition.compare(last_value) and not transition.compare(value):
118  # un-latch, so next time we change to a value that compares True we trigger again:
119  del self.latch_transitionslatch_transitions[transition]
120  else:
121  self.log.debug(f"Got {new_message.device}.{new_message.name} but {transition=} did not match")
122 
123  def preprocess(self, speech):
124  if isinstance(speech, Recording):
125  return speech
126  speech_text = speech.markup
127  substitutables = re.findall(r"({[^}]+})", speech.markup)
128  for sub in substitutables:
129  indi_id = sub[1:-1]
130  value = self.client[indi_id]
131  if hasattr(value, 'value'):
132  value = value.value
133  self.log.debug(f"Replacing {repr(sub)} with {value=}")
134  if value is not None:
135  try:
136  value = float(value)
137  value = "{:.1f}".format(value)
138  except (TypeError, ValueError):
139  value = str(value)
140  speech_text = speech_text.replace(sub, value)
141  return SSML(speech_text)
142 
143  def handle_personality_switch(self, prop : properties.IndiProperty, new_message):
144  if not isinstance(new_message, messages.IndiNewMessage):
145  return
146  active_personality = None
147  for elem in prop:
148  prop[elem] = constants.SwitchState.OFF
149  if elem in new_message and new_message[elem] is constants.SwitchState.ON:
150  active_personality = elem
151 
152  if active_personality is not None:
153  self.log.info(f"Switching to {active_personality=}")
154  self.load_personalityload_personality(active_personality)
155  prop[self.active_personalityactive_personality] = constants.SwitchState.ON
156  self.update_property(prop)
157 
158  def handle_soundboard_switch(self, prop: properties.IndiProperty, new_message):
159  print(new_message)
160  if not isinstance(new_message, messages.IndiNewMessage):
161  return
162  for elem in new_message:
163  if new_message[elem] is constants.SwitchState.ON:
164  srq = self.personalitypersonality.soundboard[elem]
165  self.log.info(f"Soundboard requested {srq}")
166  self.enqueue_speech_requestenqueue_speech_request(srq)
167  # set everything off again
168  self.update_property(prop)
169 
170  def load_personality(self, personality_name):
171  personality_file = os.path.join(HERE, "personalities", f"{personality_name}.xml")
172  for cb, device_name, property_name in self._cb_handles_cb_handles:
173  try:
174  self.client.unregister_callback(cb, device_name=device_name, property_name=property_name)
175  except ValueError:
176  log.exception(f"Tried to remove {cb=} {device_name=} {property_name=}")
177  self._cb_handles_cb_handles = set()
178  if self.soundboard_sw_propsoundboard_sw_prop is not None:
179  self.delete_property(self.soundboard_sw_propsoundboard_sw_prop)
180 
181  self.log.info(f"Loading personality from {personality_file}")
182  self.personalitypersonality = Personality.from_path(personality_file)
183 
184  self.soundboard_sw_propsoundboard_sw_prop = properties.SwitchVector(
185  name="soundboard",
186  rule=constants.SwitchRule.ONE_OF_MANY,
187  perm=constants.PropertyPerm.READ_WRITE,
188  )
189  for btn_name in self.personalitypersonality.soundboard:
190  self.soundboard_sw_propsoundboard_sw_prop.add_element(DefSwitch(name=btn_name, _value=constants.SwitchState.OFF))
191  self.add_property(self.soundboard_sw_propsoundboard_sw_prop, callback=self.handle_soundboard_switchhandle_soundboard_switch)
192 
193  self.default_voicedefault_voice = self.personalitypersonality.default_voice
194 
195  for reaction in self.personalitypersonality.reactions:
196  device_name, property_name, element_name = reaction.indi_id.split('.')
197  self.client.get_properties(reaction.indi_id)
198  for t in reaction.transitions:
199  cb = partial(self.reaction_handlerreaction_handler, element_name=element_name, transition=t, utterance_choices=reaction.transitions[t])
200  self.client.register_callback(
201  cb,
202  device_name=device_name,
203  property_name=property_name
204  )
205  self._cb_handles_cb_handles.add((cb, device_name, property_name))
206  self.log.debug(f"Registered reaction handler on {device_name=} {property_name=} {element_name=} using transition {t}")
207  for idx, utterance in enumerate(reaction.transitions[t]):
208  self.log.debug(f"{reaction.indi_id}: {t}: {utterance}")
209  if isinstance(utterance, SSML):
210  if not contains_substitutions(utterance.markup):
211  result = ssml_to_wav(utterance.markup, self.default_voicedefault_voice, self.api_url, self.config.cache.path)
212  self.log.debug(f"Caching synthesis to {result}")
213  else:
214  self.log.debug(f"Cannot pre-cache because there are substitutions to be made")
215  self.active_personalityactive_personality = personality_name
216  self.telem("load_personality", {'name': personality_name})
217 
218  def setup(self):
219  self.last_utterance_tslast_utterance_ts = time.time()
220  self.latch_transitionslatch_transitions = {}
221  self.per_transition_cooldown_tsper_transition_cooldown_ts = {}
222  self._cb_handles_cb_handles = set()
223  self._speech_requests_speech_requests = []
224 
225  while self.client.status is not constants.ConnectionStatus.CONNECTED:
226  self.log.info("Waiting for connection...")
227  time.sleep(1)
228  self.log.info("Connected.")
229  self.log.debug(f"Caching synthesis output to {self.config.cache.path}")
230  self.config.cache.ensure_exists()
231  self.load_personalityload_personality(self.active_personalityactive_personality)
232 
233  sv = properties.SwitchVector(
234  name="mute",
235  rule=constants.SwitchRule.ONE_OF_MANY,
236  perm=constants.PropertyPerm.READ_WRITE,
237  )
238  sv.add_element(DefSwitch(name=f"toggle", _value=constants.SwitchState.ON if self.mutemute else constants.SwitchState.OFF))
239  self.add_property(sv, callback=self.handle_mute_togglehandle_mute_toggle)
240 
241  sv = properties.SwitchVector(
242  name="personality",
243  rule=constants.SwitchRule.ONE_OF_MANY,
244  perm=constants.PropertyPerm.READ_WRITE,
245  )
246  for pers in self.personalities:
247  print(f"{pers=}")
248  sv.add_element(DefSwitch(name=pers, _value=constants.SwitchState.ON if self.active_personalityactive_personality == pers else constants.SwitchState.OFF))
249  self.add_property(sv, callback=self.handle_personality_switchhandle_personality_switch)
250 
251  speech_text = properties.TextVector(name="speech_text", perm=constants.PropertyPerm.READ_WRITE)
252  speech_text.add_element(DefText(
253  name="current",
254  _value=None,
255  ))
256  speech_text.add_element(DefText(
257  name="target",
258  _value=None,
259  ))
260  self.add_property(speech_text, callback=self.handle_speech_texthandle_speech_text)
261 
262  speech_request = properties.SwitchVector(
263  name="speak",
264  rule=constants.SwitchRule.ANY_OF_MANY,
265  )
266  speech_request.add_element(DefSwitch(name="request", _value=constants.SwitchState.OFF))
267  self.add_property(speech_request, callback=self.handle_speech_requesthandle_speech_request)
268 
269  reload_request = properties.SwitchVector(
270  name="reload_personality",
271  rule=constants.SwitchRule.ANY_OF_MANY,
272  )
273  reload_request.add_element(DefSwitch(name="request", _value=constants.SwitchState.OFF))
274  self.add_property(reload_request, callback=self.handle_reload_requesthandle_reload_request)
275 
276  self.log.info("Set up complete")
277 
278  def loop(self):
279  while len(self._speech_requests_speech_requests):
280  speech = self.preprocesspreprocess(self._speech_requests_speech_requests.pop(0))
281  if self.mutemute:
282  self.log.debug(f"Would have said: {repr(speech)}, but muted")
283  else:
284  self.log.info(f"Speaking: {repr(speech)}")
285  speak(speech, self.default_voicedefault_voice, self.api_url, self.config.cache.path)
286  self.log.debug("Speech complete")
287  self.last_utterance_tslast_utterance_ts = time.time() # update timestamp to prevent random utterances
288  if time.time() - self.last_utterance_tslast_utterance_ts > self.config.random_utterance_interval_sec and len(self.personalitypersonality.random_utterances):
289  next_utterance = choice(self.personalitypersonality.random_utterances)
290  while next_utterance == self.last_utterance_chosenlast_utterance_chosen:
291  next_utterance = choice(self.personalitypersonality.random_utterances)
292  self.last_utterance_chosenlast_utterance_chosen = next_utterance
293  self.last_utterance_tslast_utterance_ts = time.time()
294  if self.mutemute:
295  self.log.debug(f"Would have said: {repr(next_utterance)}, but muted")
296  else:
297  self.log.info(f"Randomly spouting off: {repr(next_utterance)}")
298  speak(next_utterance, self.default_voicedefault_voice, self.api_url, self.config.cache.path)
def reaction_handler(self, new_message, element_name, transition, utterance_choices)
Definition: core.py:90
def handle_personality_switch(self, properties.IndiProperty prop, new_message)
Definition: core.py:143
def handle_reload_request(self, existing_property, new_message)
Definition: core.py:74
def handle_speech_request(self, existing_property, new_message)
Definition: core.py:64
def enqueue_speech_request(self, sr)
Definition: core.py:51
def load_personality(self, personality_name)
Definition: core.py:170
def handle_speech_text(self, existing_property, new_message)
Definition: core.py:57
def handle_soundboard_switch(self, properties.IndiProperty prop, new_message)
Definition: core.py:158
def preprocess(self, speech)
Definition: core.py:123
def handle_mute_toggle(self, existing_property, new_message)
Definition: core.py:80
def contains_substitutions(text)
Definition: core.py:32
def drop_xml_tags(raw_xml)
Definition: core.py:24
def speak(speech, default_voice, api_url, cache_dir)
def ssml_to_wav(ssml_text, default_voice, api_url, cache_dir)