465 lines
12 KiB
Python
465 lines
12 KiB
Python
import threading
|
|
import queue
|
|
import json
|
|
import time
|
|
import os
|
|
import subprocess
|
|
import sounddevice as sd
|
|
import re
|
|
import asyncio
|
|
from weather_jetzt import get_weather_for_location
|
|
from timer_control import parse_time, start_timer, stop_timer, timer_status_info, format_duration
|
|
from wecker_control import parse_time_wecker, minutes_until, start_wecker, stop_wecker, wecker_status_info, calculate_target_datetime
|
|
|
|
#test
|
|
|
|
# =========================
|
|
# KONFIGURATION
|
|
# =========================
|
|
|
|
VOSK_MODEL_PATH = "/home/tino/Desktop/Abschlussprojekt/test assistant/cloneAssistantAllInOne/vosk-model-de-0.21/"
|
|
PIPER_BIN = "piper"
|
|
PIPER_MODEL = "de_DE-thorsten-medium.onnx"
|
|
SAMPLE_RATE = 22050
|
|
|
|
# =========================
|
|
# STATES
|
|
# =========================
|
|
|
|
STATE_IDLE = "IDLE"
|
|
STATE_LISTENING = "LISTENING"
|
|
STATE_SPEAKING = "SPEAKING"
|
|
|
|
# =========================
|
|
# GLOBALER ZUSTAND
|
|
# =========================
|
|
|
|
state = STATE_IDLE
|
|
context = {
|
|
"intent": None,
|
|
"slots": {},
|
|
"required_slots": [],
|
|
"pending_slot": None,
|
|
"action": None
|
|
}
|
|
|
|
audio_queue = queue.Queue()
|
|
|
|
# =========================
|
|
# TTS (PIPER)
|
|
# =========================
|
|
|
|
def speak(text):
|
|
global state
|
|
state = STATE_SPEAKING
|
|
print(f"[TTS] {text}")
|
|
|
|
process = subprocess.Popen(
|
|
[PIPER_BIN, "--model", PIPER_MODEL, "--output-raw"],
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE
|
|
)
|
|
|
|
audio = process.communicate(input=text.encode("utf-8"))[0]
|
|
|
|
play = subprocess.Popen(
|
|
["aplay", "-r", str(SAMPLE_RATE), "-f", "S16_LE"],
|
|
stdin=subprocess.PIPE
|
|
)
|
|
play.communicate(audio)
|
|
|
|
state = STATE_LISTENING
|
|
|
|
|
|
# =========================
|
|
# INTENTS & SLOTS
|
|
# =========================
|
|
|
|
INTENTS = {
|
|
"weather": {
|
|
"keywords": ["wetter", "temperatur", "regen"],
|
|
"required_slots": {
|
|
"location": r"\bin\b\s*(\w+)"
|
|
},
|
|
"subactions": ["info"]
|
|
},
|
|
|
|
"timer": {
|
|
"keywords": ["timer"],
|
|
# "required_slots": {},
|
|
"actions":{
|
|
"start": {
|
|
"keywords": ["starte", "start", "beginne", "stelle"],
|
|
"required_slots": {
|
|
# "duration": r"(\w+)\s*(sekunden|sekunde|minuten|minute|stunden|stunde)"
|
|
"duration": r"((?:\w+)\s*(?:sekunden|sekunde|minuten|minute|stunden|stunde))"
|
|
|
|
},
|
|
},
|
|
"stop": {
|
|
"keywords": ["stopp", "stoppe", "beende"],
|
|
"required_slots": {}
|
|
},
|
|
"status": {
|
|
"keywords": ["status", "läuft", "noch"],
|
|
"required_slots": {}
|
|
}
|
|
}
|
|
},
|
|
"wecker": {
|
|
"keywords": ["wecker", "timer"],
|
|
"actions": {
|
|
"start": {
|
|
"keywords": ["erstelle", "stelle"],
|
|
"required_slots": {
|
|
"timeSet": r"\b([\w]+)\s*uhr(?:\s*([\w]+))?\b"
|
|
},
|
|
},
|
|
"stop": {
|
|
"keywords": ["stopp", "stoppe", "entferne"],
|
|
"required_slots": {}
|
|
},
|
|
"status": {
|
|
"keywords": ["status", "läuft", "noch"],
|
|
"required_slots": {}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
# evtl mit regex überarbeiten
|
|
|
|
def detect_intent(text):
|
|
text = text.lower()
|
|
for name, data in INTENTS.items():
|
|
if any(word in text for word in data["keywords"]):
|
|
return name
|
|
return None
|
|
|
|
|
|
# =========================
|
|
# SKILLS
|
|
# =========================
|
|
|
|
|
|
## WEATHER
|
|
def weather_skill(slots):
|
|
location = slots["location"]
|
|
result = asyncio.run(get_weather_for_location(location))
|
|
|
|
if result:
|
|
return f"Aktuell sind es in {result['location']} {result['temperatur']} Grad und die Wetterlage sieht {result['wetterlage']} aus."
|
|
else:
|
|
return f"Keine Wetterdaten verfügbar"
|
|
#return f"Das Wetter in {location} ist sonnig bei 20 Grad."
|
|
|
|
## TIMER
|
|
def start_timer_skill(slots):
|
|
duration = slots["duration"]
|
|
seconds = parse_time(duration)
|
|
if seconds:
|
|
start_timer(seconds)
|
|
return f"Timer gestartet"
|
|
# return f"Der Timer für {duration} wurde gestartet."
|
|
|
|
def stopp_timer_skill(slots):
|
|
#return f"Timer wurde gestoppt."
|
|
stop_timer()
|
|
return f"Timer wurde gestoppt"
|
|
|
|
def status_timer_skill(slots):
|
|
# remaining = timer_status_info()["remaining"]
|
|
# return f"Status Timer Ausgabe {remaining}"
|
|
|
|
info = timer_status_info()
|
|
|
|
if info["status"] == "running":
|
|
remaining = info["remaining"]
|
|
return f"Der Timer läuft noch {format_duration(remaining)} "
|
|
elif info["status"] == "finished":
|
|
return f"Der Timer ist abgelaufen"
|
|
elif info["status"] == "stopped":
|
|
return f"Der Timer wurde gestoppt"
|
|
else:
|
|
return f"Es läuft kein Timer"
|
|
|
|
#ALARM
|
|
def start_wecker_skill(slots):
|
|
parsed = parse_time_wecker(slots["timeSet"])
|
|
|
|
if not parsed:
|
|
return "Die Uhrzeit konnte nicht erkannt werden."
|
|
|
|
hour, minute = parsed
|
|
target = calculate_target_datetime(hour, minute)
|
|
|
|
start_wecker(target)
|
|
minutes = minutes_until(target)
|
|
|
|
return f"Wecker für {hour:02d}:{minute:02d} wurde gestellt er klingelt in {minutes} minuten"
|
|
|
|
def stopp_wecker_skill(slots):
|
|
stop_wecker()
|
|
print("Wecker wurde gestoppt")
|
|
return f"Wecker wurde gestoppt"
|
|
|
|
def status_wecker_skill(slots):
|
|
|
|
info = wecker_status_info()
|
|
|
|
if info["status"] == "running":
|
|
target_time = info["target_time"]
|
|
return f"Ein Wecker wurde bereits auf {wecker_target_time} gestellt"
|
|
elif info["status"] == "finished":
|
|
return f"Der Wecker ist abgelaufen"
|
|
elif info["status"] == "stopped":
|
|
return f"Der Wecker wurde gestoppt, es ist kein Wecker gestellt"
|
|
else:
|
|
return f"Es ist kein Wecker gestellt"
|
|
|
|
|
|
|
|
SKILLS = {
|
|
"weather": weather_skill,
|
|
"timer": {
|
|
"start": start_timer_skill,
|
|
"stop": stopp_timer_skill,
|
|
"status": status_timer_skill,
|
|
},
|
|
"wecker": {
|
|
"start": start_wecker_skill,
|
|
"stop": stopp_wecker_skill,
|
|
"status": status_wecker_skill
|
|
}
|
|
}
|
|
|
|
# =========================
|
|
# DIALOGLOGIK
|
|
# =========================
|
|
|
|
def handle_text(text):
|
|
global context, state
|
|
|
|
if state != STATE_LISTENING:
|
|
return
|
|
|
|
print(f"[STT] {text}")
|
|
|
|
# 1. Intent erkennen
|
|
if not context["intent"]:
|
|
intent = detect_intent(text)
|
|
if not intent:
|
|
speak("Das habe ich nicht verstanden.")
|
|
reset_context()
|
|
return
|
|
context["intent"] = intent
|
|
|
|
|
|
|
|
##NOCHMAL GENAUER ERKLÄREN LASSEN (instatt in check reqired() nach required slots check nach open nach oben verschoben damit required slots funktoinieren )
|
|
intent_data = INTENTS[context["intent"]]
|
|
actions = intent_data.get("actions")
|
|
|
|
if actions:
|
|
for action_name, action_data in actions.items():
|
|
if any(k in text for k in action_data.get("keywords", [])):
|
|
context["action"] = action_name
|
|
break
|
|
|
|
#Edgecase falls nutzer befehl bei dem action benötigt wird ohne action angibt
|
|
if INTENTS[context["intent"]].get("actions") and context["action"] is None:
|
|
speak("Ungültige Eingabe, Aktion wurde nicht genannt")
|
|
return False
|
|
|
|
|
|
if context["action"] == None:
|
|
context["required_slots"] = INTENTS[context["intent"]]["required_slots"]
|
|
else:
|
|
context["required_slots"] = INTENTS[context["intent"]]["actions"][context["action"]]["required_slots"]
|
|
|
|
|
|
|
|
# 2. Fehlende Slots prüfen
|
|
if not check_required(text):
|
|
return
|
|
|
|
|
|
# 4. Skill ausführen
|
|
|
|
if context["action"] == None:
|
|
result = SKILLS[context["intent"]](context["slots"])
|
|
|
|
else:
|
|
result = SKILLS[context["intent"]][context["action"]](context["slots"])
|
|
|
|
speak(result)
|
|
reset_context()
|
|
|
|
def check_required(text):
|
|
intent_data = INTENTS[context["intent"]]
|
|
|
|
text = text.lower()
|
|
|
|
#for slot, pattern in intent_data.get("required_slots", {}).items():
|
|
for slot, pattern in context["required_slots"].items():
|
|
if slot not in context["slots"]:
|
|
match = re.search(pattern, text)
|
|
if match:
|
|
context["slots"][slot] = match.group(0) #alles slots
|
|
else:
|
|
context["pending_slot"] = slot
|
|
ask_for_slot(slot)
|
|
return False
|
|
|
|
|
|
|
|
#Edgecase falls nutzer befehl bei dem action benötigt wird ohne action angibt
|
|
# if INTENTS[context["intent"]].get("actions") and context["action"] is None:
|
|
# speak("Ungültige Eingabe, Aktion wurde nicht genannt")
|
|
# return False
|
|
|
|
|
|
|
|
context["pending_slot"] = None
|
|
return True
|
|
|
|
|
|
def ask_for_slot(slot):
|
|
questions = {
|
|
"location": "Für welchen Ort?",
|
|
"duration": "Wie lange soll der Timer laufen?",
|
|
"timeSet": "Zu welcher Uhrzeit soll der Wecker klingeln?"
|
|
}
|
|
speak(questions.get(slot, "Bitte spezifizieren."))
|
|
|
|
|
|
def reset_context():
|
|
global context, state
|
|
context = {
|
|
"intent": None,
|
|
"slots": {},
|
|
"required_slots": [],
|
|
"pending_slot": None,
|
|
"action": None
|
|
}
|
|
#state = STATE_IDLE
|
|
state = STATE_LISTENING
|
|
|
|
|
|
# =========================
|
|
# VOSK LISTENER
|
|
# =========================
|
|
"""
|
|
def vosk_listener():
|
|
SAMPLE_RATE_VOSK = 16000
|
|
from vosk import Model, KaldiRecognizer
|
|
import pyaudio
|
|
|
|
model = Model(VOSK_MODEL_PATH)
|
|
rec = KaldiRecognizer(model, SAMPLE_RATE_VOSK)
|
|
|
|
p = pyaudio.PyAudio()
|
|
stream = p.open(
|
|
format=pyaudio.paInt16,
|
|
channels=1,
|
|
rate=SAMPLE_RATE_VOSK,
|
|
input=True,
|
|
frames_per_buffer=4000
|
|
)
|
|
# stream.start_stream()
|
|
|
|
while True:
|
|
if state != STATE_SPEAKING: #hinzugefügt um fehlerhafte eingaben zu stoppen
|
|
data = stream.read(4000, exception_on_overflow=False)
|
|
if rec.AcceptWaveform(data):
|
|
result = json.loads(rec.Result())
|
|
text = result.get("text", "")
|
|
if text:
|
|
audio_queue.put(text)
|
|
else:
|
|
rec.Reset()
|
|
|
|
"""
|
|
# =========================
|
|
# WAKEWORD (SIMPLIFIZIERT)
|
|
# =========================
|
|
"""
|
|
def fake_wakeword_detector():
|
|
global state
|
|
while True:
|
|
if state == STATE_IDLE:
|
|
time.sleep(0.1)
|
|
state = STATE_LISTENING
|
|
speak("Wie kann ich helfen?")
|
|
"""
|
|
# ==========================
|
|
# WAKEWORD (PORCUPINE)
|
|
# ==========================
|
|
|
|
"""
|
|
def real_wakeword_detector():
|
|
import pvporcupine
|
|
import numpy as np
|
|
|
|
global state
|
|
|
|
ACCESS_KEY = "lpz+8e9omUnQtCQPeaawZauxVRqdhbcDH3fz19oZsp7zXKflWCiYMw=="
|
|
WAKEWORD = "jarvis" # built-in wake word
|
|
|
|
porcupine = pvporcupine.create(
|
|
access_key=ACCESS_KEY,
|
|
keywords=[WAKEWORD]
|
|
)
|
|
if state == STATE_IDLE:
|
|
|
|
def callback(indata, frames, time_info, status):
|
|
pcm = np.frombuffer(indata, dtype=np.int16)
|
|
result = porcupine.process(pcm)
|
|
if result >= 0:
|
|
time.sleep(1) #verbesserung der spracheingabe: wurde hinzugefügt weil es sonst worte halluziniert (wie "eine", "jarvis")
|
|
state = STATE_LISTENING
|
|
print("WAKE WORD DETECTED")
|
|
#speak("Ja, wie kann ich helfen?")
|
|
speak("Ja?")
|
|
|
|
|
|
with sd.InputStream(
|
|
samplerate=porcupine.sample_rate,
|
|
channels=1,
|
|
dtype="int16",
|
|
blocksize=porcupine.frame_length,
|
|
callback=callback,
|
|
):
|
|
print("Listening...")
|
|
while True:
|
|
pass
|
|
|
|
"""
|
|
# =========================
|
|
# MAIN LOOP
|
|
# =========================
|
|
|
|
""" def main():
|
|
threading.Thread(target=vosk_listener, daemon=True).start()
|
|
# threading.Thread(target=fake_wakeword_detector, daemon=True).start()
|
|
threading.Thread(target=real_wakeword_detector, daemon=True).start()
|
|
|
|
while True:
|
|
try:
|
|
text = audio_queue.get(timeout=0.1)
|
|
handle_text(text)
|
|
except queue.Empty:
|
|
pass """
|
|
|
|
def main():
|
|
global state
|
|
state = STATE_LISTENING
|
|
while True:
|
|
text = input("Text input: ")
|
|
handle_text(text)
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|