API
core.py
Go to the documentation of this file.
1 import sys
2 from typing import Optional, Union
3 from random import choice
4 import time
5 from functools import partial
6 from enum import Enum
7 import logging
8 import os
9 import os.path
10 import pprint
11 import re
12 import xconf
13 from purepyindi2 import device, properties, constants, messages
14 from purepyindi2.messages import DefNumber, DefSwitch, DefText
15 from magaox.indi.device import XDevice, BaseConfig
16 
17 from .personality import Personality, Transition, Operation, SSML, Recording
18 from .opentts_bridge import speak, ssml_to_wav
19 
20 log = logging.getLogger(__name__)
21 HERE = os.path.dirname(__file__)
22 TAGS_RE = re.compile('<.*?>')
23 
24 def drop_xml_tags(raw_xml):
25  return TAGS_RE.sub('', raw_xml)
26 
27 @xconf.config
28 class AudibleAlertsConfig(BaseConfig):
29  random_utterance_interval_sec : Union[float, int] = xconf.field(default=15 * 60, help="Seconds since last (real or random) utterance before a random utterance should play")
30  cache : xconf.DirectoryConfig = xconf.field(default=xconf.DirectoryConfig(path="/tmp/audibleAlerts_cache"))
31 
33  return '{' in text or '}' in text
34 
35 class AudibleAlerts(XDevice):
36  config : AudibleAlertsConfig
37  personality : Personality
38  _cb_handles : set
39  _speech_requests : list[Union[SSML, Recording]]
40  soundboard_sw_prop : properties.SwitchVector = None
41  default_voice : str = "coqui-tts:en_ljspeech" # overridden by personality when loaded
42  personalities : list[str] = ['default', 'lab_mode',]
43  active_personality : str = "default"
44  api_url : str = "http://localhost:5500/"
45  mute : bool = False
46  latch_transitions : dict[Transition, constants.AnyIndiValue] # store last value when triggering a transition so subsequent messages don't trigger too
47  per_transition_cooldown_ts : dict[Transition, float]
48  last_utterance_ts : float = 0
49  last_utterance_chosen : Optional[str] = None
50 
51  def enqueue_speech_request(self, sr):
52  if any(sr == x for x in self._speech_requests_speech_requests):
53  log.warn(f"Duplicated speech request while already enqueued: {sr}")
54  return
55  self._speech_requests_speech_requests.append(sr)
56 
57  def handle_speech_text(self, existing_property, new_message):
58  if 'target' in new_message and new_message['target'] != existing_property['current']:
59  self.log.debug(f"Setting new speech text: {new_message['target']}")
60  existing_property['current'] = new_message['target']
61  existing_property['target'] = new_message['target']
62  self.update_property(existing_property)
63 
64  def handle_speech_request(self, existing_property, new_message):
65  self.log.debug(f"{new_message['request']=}")
66  if new_message['request'] is constants.SwitchState.ON:
67  current_text = self.properties['speech_text']['current']
68  if current_text is not None and len(current_text.strip()) != 0:
69  st = '<speak>' + current_text + '</speak>'
70  self.enqueue_speech_requestenqueue_speech_request(SSML(st))
71  self.telem("speech_request", {"text": current_text})
72  self.update_property(existing_property) # ensure the request switch turns back off at the client
73 
74  def handle_reload_request(self, existing_property, new_message):
75  if new_message['request'] is constants.SwitchState.ON:
76  self.telem("reload_personality", {"name": self.active_personalityactive_personality})
77  self.load_personalityload_personality(self.active_personalityactive_personality)
78  self.update_property(existing_property) # ensure the request switch turns back off at the client
79 
80  def handle_mute_toggle(self, existing_property, new_message):
81  existing_property['toggle'] = new_message['toggle']
82  self.mutemute = new_message['toggle'] is constants.SwitchState.ON
83  if self.mutemute:
84  log.debug("Muted")
85  else:
86  log.debug("Unmuted")
87  self.update_property(existing_property)
88  self.telem("mute_toggle", {"mute": self.mutemute})
89 
90  def reaction_handler(self, new_message, element_name, transition, utterance_choices):
91  if not isinstance(new_message, messages.IndiSetMessage):
92  return
93  if element_name not in new_message:
94  return
95  value = new_message[element_name]
96  self.log.debug(f"Judging reaction for {element_name} change to {repr(value)} using {transition}")
97  self.log.debug(f"before check {self.latch_transitions=}")
98  last_value = self.latch_transitionslatch_transitions.get(transition)
99  self.log.debug(f"{new_message}\n{transition.compare(value)=}, last value was {last_value}, {value != last_value=} {(not transition.compare(last_value))=}")
100  if transition.compare(value) and (
101  # if there's no operation, we fire on any change,
102  # but make sure it's actually a change
103  (transition.op is None and value != last_value) or
104  # don't fire if we already compared true on the last value:
105  (not transition.compare(last_value))
106  ):
107  self.latch_transitionslatch_transitions[transition] = value
108  self.log.debug(f"after update {self.latch_transitions=}")
109  self.log.debug(f"latched {transition=} with {value=}")
110  last_transition_ts = self.per_transition_cooldown_tsper_transition_cooldown_ts.get(transition, 0)
111  sec_since_trigger = time.time() - last_transition_ts
112  debounce_expired = sec_since_trigger > transition.debounce_sec
113  self.log.debug(f"Checking for debounce: {sec_since_trigger=} {debounce_expired=}")
114  if debounce_expired:
115  utterance = choice(utterance_choices)
116  self.log.debug(f"Submitting speech request: {utterance}")
117  self.enqueue_speech_requestenqueue_speech_request(utterance)
118  else:
119  self.log.debug(f"Would have spoken, but it's only been {sec_since_trigger=}")
120  elif transition.compare(last_value) and not transition.compare(value):
121  self.log.debug(f"un-latch {transition}, so next time we change to a value that compares True we trigger again. ({last_value=} {value=})")
122  self.log.debug(f"before {self.latch_transitions=}")
123  del self.latch_transitionslatch_transitions[transition]
124  self.log.debug(f"after {self.latch_transitions=}")
125  else:
126  self.log.debug(f"Got {new_message.device}.{new_message.name} but {transition=} did not match")
127 
128  def preprocess(self, speech):
129  if isinstance(speech, Recording):
130  return speech
131  speech_text = speech.markup
132  substitutables = re.findall(r"({[^}]+})", speech.markup)
133  for sub in substitutables:
134  indi_id = sub[1:-1]
135  value = self.client[indi_id]
136  if hasattr(value, 'value'):
137  value = value.value
138  self.log.debug(f"Replacing {repr(sub)} with {value=}")
139  if value is not None:
140  try:
141  value = float(value)
142  value = "{:.1f}".format(value)
143  except (TypeError, ValueError):
144  value = str(value)
145  speech_text = speech_text.replace(sub, value)
146  return SSML(speech_text)
147 
148  def handle_personality_switch(self, prop : properties.IndiProperty, new_message):
149  if not isinstance(new_message, messages.IndiNewMessage):
150  return
151  active_personality = None
152  for elem in prop:
153  prop[elem] = constants.SwitchState.OFF
154  if elem in new_message and new_message[elem] is constants.SwitchState.ON:
155  active_personality = elem
156 
157  if active_personality is not None:
158  self.log.info(f"Switching to {active_personality=}")
159  self.load_personalityload_personality(active_personality)
160  prop[self.active_personalityactive_personality] = constants.SwitchState.ON
161  self.update_property(prop)
162 
163  def handle_soundboard_switch(self, prop: properties.IndiProperty, new_message):
164  print(new_message)
165  if not isinstance(new_message, messages.IndiNewMessage):
166  return
167  for elem in new_message:
168  if new_message[elem] is constants.SwitchState.ON:
169  srq = self.personalitypersonality.soundboard[elem]
170  self.log.info(f"Soundboard requested {srq}")
171  self.enqueue_speech_requestenqueue_speech_request(srq)
172  # set everything off again
173  self.update_property(prop)
174 
175  def load_personality(self, personality_name):
176  personality_file = os.path.join(HERE, "personalities", f"{personality_name}.xml")
177  for cb, device_name, property_name, _ in self._cb_handles_cb_handles:
178  try:
179  self.client.unregister_callback(cb, device_name=device_name, property_name=property_name)
180  except ValueError:
181  log.exception(f"Tried to remove {cb=} {device_name=} {property_name=}")
182  self._cb_handles_cb_handles = set()
183  if self.soundboard_sw_propsoundboard_sw_prop is not None:
184  self.delete_property(self.soundboard_sw_propsoundboard_sw_prop)
185 
186  self.log.info(f"Loading personality from {personality_file}")
187  self.personalitypersonality = Personality.from_path(personality_file)
188 
189  self.soundboard_sw_propsoundboard_sw_prop = properties.SwitchVector(
190  name="soundboard",
191  rule=constants.SwitchRule.ONE_OF_MANY,
192  perm=constants.PropertyPerm.READ_WRITE,
193  )
194  for btn_name in self.personalitypersonality.soundboard:
195  self.soundboard_sw_propsoundboard_sw_prop.add_element(DefSwitch(name=btn_name, _value=constants.SwitchState.OFF))
196  self.add_property(self.soundboard_sw_propsoundboard_sw_prop, callback=self.handle_soundboard_switchhandle_soundboard_switch)
197 
198  self.default_voicedefault_voice = self.personalitypersonality.default_voice
199 
200  for reaction in self.personalitypersonality.reactions:
201  device_name, property_name, element_name = reaction.indi_id.split('.')
202  self.client.get_properties(reaction.indi_id)
203  for t in reaction.transitions:
204  cb = partial(self.reaction_handlerreaction_handler, element_name=element_name, transition=t, utterance_choices=reaction.transitions[t])
205  self.client.register_callback(
206  cb,
207  device_name=device_name,
208  property_name=property_name
209  )
210  self._cb_handles_cb_handles.add((cb, device_name, property_name, t))
211  self.log.debug(f"Registered reaction handler on {device_name=} {property_name=} {element_name=} using transition {t}")
212  for idx, utterance in enumerate(reaction.transitions[t]):
213  self.log.debug(f"{reaction.indi_id}: {t}: {utterance}")
214  if isinstance(utterance, SSML):
215  if not contains_substitutions(utterance.markup):
216  result = ssml_to_wav(utterance.markup, self.default_voicedefault_voice, self.api_url, self.config.cache.path)
217  self.log.debug(f"Caching synthesis to {result}")
218  else:
219  self.log.debug(f"Cannot pre-cache because there are substitutions to be made")
220  self.active_personalityactive_personality = personality_name
221  self.telem("load_personality", {'name': personality_name})
222  self.send_all_properties()
223 
224  def setup(self):
225  self.last_utterance_tslast_utterance_ts = time.time()
226  self.latch_transitionslatch_transitions = {}
227  self.per_transition_cooldown_tsper_transition_cooldown_ts = {}
228  self._cb_handles_cb_handles = set()
229  self._speech_requests_speech_requests = []
230 
231  while self.client.status is not constants.ConnectionStatus.CONNECTED:
232  self.log.info("Waiting for connection...")
233  time.sleep(1)
234  self.log.info("Connected.")
235  self.log.debug(f"Caching synthesis output to {self.config.cache.path}")
236  self.config.cache.ensure_exists()
237  self.load_personalityload_personality(self.active_personalityactive_personality)
238 
239  sv = properties.SwitchVector(
240  name="mute",
241  rule=constants.SwitchRule.ONE_OF_MANY,
242  perm=constants.PropertyPerm.READ_WRITE,
243  )
244  sv.add_element(DefSwitch(name=f"toggle", _value=constants.SwitchState.ON if self.mutemute else constants.SwitchState.OFF))
245  self.add_property(sv, callback=self.handle_mute_togglehandle_mute_toggle)
246 
247  sv = properties.SwitchVector(
248  name="personality",
249  rule=constants.SwitchRule.ONE_OF_MANY,
250  perm=constants.PropertyPerm.READ_WRITE,
251  )
252  for pers in self.personalities:
253  print(f"{pers=}")
254  sv.add_element(DefSwitch(name=pers, _value=constants.SwitchState.ON if self.active_personalityactive_personality == pers else constants.SwitchState.OFF))
255  self.add_property(sv, callback=self.handle_personality_switchhandle_personality_switch)
256 
257  speech_text = properties.TextVector(name="speech_text", perm=constants.PropertyPerm.READ_WRITE)
258  speech_text.add_element(DefText(
259  name="current",
260  _value=None,
261  ))
262  speech_text.add_element(DefText(
263  name="target",
264  _value=None,
265  ))
266  self.add_property(speech_text, callback=self.handle_speech_texthandle_speech_text)
267 
268  speech_request = properties.SwitchVector(
269  name="speak",
270  rule=constants.SwitchRule.ANY_OF_MANY,
271  )
272  speech_request.add_element(DefSwitch(name="request", _value=constants.SwitchState.OFF))
273  self.add_property(speech_request, callback=self.handle_speech_requesthandle_speech_request)
274 
275  reload_request = properties.SwitchVector(
276  name="reload_personality",
277  rule=constants.SwitchRule.ANY_OF_MANY,
278  )
279  reload_request.add_element(DefSwitch(name="request", _value=constants.SwitchState.OFF))
280  self.add_property(reload_request, callback=self.handle_reload_requesthandle_reload_request)
281 
282  self.log.info("Set up complete")
283 
284  def loop(self):
285  while len(self._speech_requests_speech_requests):
286  speech = self.preprocesspreprocess(self._speech_requests_speech_requests.pop(0))
287  if self.mutemute:
288  self.log.debug(f"Would have said: {repr(speech)}, but muted")
289  else:
290  self.log.info(f"Speaking: {repr(speech)}")
291  speak(speech, self.default_voicedefault_voice, self.api_url, self.config.cache.path)
292  self.log.debug("Speech complete")
293  self.last_utterance_tslast_utterance_ts = time.time() # update timestamp to prevent random utterances
294  if time.time() - self.last_utterance_tslast_utterance_ts > self.config.random_utterance_interval_sec and len(self.personalitypersonality.random_utterances):
295  next_utterance = choice(self.personalitypersonality.random_utterances)
296  while next_utterance == self.last_utterance_chosenlast_utterance_chosen:
297  next_utterance = choice(self.personalitypersonality.random_utterances)
298  self.last_utterance_chosenlast_utterance_chosen = next_utterance
299  self.last_utterance_tslast_utterance_ts = time.time()
300  if self.mutemute:
301  self.log.debug(f"Would have said: {repr(next_utterance)}, but muted")
302  else:
303  self.log.info(f"Randomly spouting off: {repr(next_utterance)}")
304  speak(next_utterance, self.default_voicedefault_voice, self.api_url, self.config.cache.path)
def reaction_handler(self, new_message, element_name, transition, utterance_choices)
Definition: core.py:90
def handle_personality_switch(self, properties.IndiProperty prop, new_message)
Definition: core.py:148
def handle_reload_request(self, existing_property, new_message)
Definition: core.py:74
def handle_speech_request(self, existing_property, new_message)
Definition: core.py:64
def enqueue_speech_request(self, sr)
Definition: core.py:51
def load_personality(self, personality_name)
Definition: core.py:175
def handle_speech_text(self, existing_property, new_message)
Definition: core.py:57
def handle_soundboard_switch(self, properties.IndiProperty prop, new_message)
Definition: core.py:163
def preprocess(self, speech)
Definition: core.py:128
def handle_mute_toggle(self, existing_property, new_message)
Definition: core.py:80
def contains_substitutions(text)
Definition: core.py:32
def drop_xml_tags(raw_xml)
Definition: core.py:24
def speak(speech, default_voice, api_url, cache_dir)
def ssml_to_wav(ssml_text, default_voice, api_url, cache_dir)