Files
RasPi_Voice_Assistant--WIP/main.py

370 lines
9.2 KiB
Python

import threading
import queue
import json
import time
import os
import subprocess
import sounddevice as sd
import re
import asyncio
#test
# =========================
# KONFIGURATION
# =========================
VOSK_MODEL_PATH = "/home/tino/Desktop/Abschlussprojekt/test assistant/cloneAssistantAllInOne/vosk-model-de-0.21/"
PIPER_BIN = "piper"
PIPER_MODEL = "de_DE-thorsten-medium.onnx"
SAMPLE_RATE = 22050
# =========================
# STATES
# =========================
STATE_IDLE = "IDLE"
STATE_LISTENING = "LISTENING"
STATE_SPEAKING = "SPEAKING"
# =========================
# GLOBALER ZUSTAND
# =========================
state = STATE_IDLE
context = {
"intent": None,
"slots": {},
"required_slots": [],
"pending_slot": None,
"action": None
}
audio_queue = queue.Queue()
# =========================
# TTS (PIPER)
# =========================
def speak(text):
global state
state = STATE_SPEAKING
print(f"[TTS] {text}")
process = subprocess.Popen(
[PIPER_BIN, "--model", PIPER_MODEL, "--output-raw"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE
)
audio = process.communicate(input=text.encode("utf-8"))[0]
play = subprocess.Popen(
["aplay", "-r", str(SAMPLE_RATE), "-f", "S16_LE"],
stdin=subprocess.PIPE
)
play.communicate(audio)
state = STATE_LISTENING
# =========================
# INTENTS & SLOTS
# =========================
INTENTS = {
"weather": {
"keywords": ["wetter", "temperatur", "regen"],
"required_slots": {
"location": r"\bin\b\s*(\w+)"
},
"subactions": ["info"]
},
"timer": {
"keywords": ["timer"],
# "required_slots": {},
"actions":{
"start": {
"keywords": ["starte", "start", "beginne", "stelle"],
"required_slots": {
"duration": r"(\w+)\s*(sekunden|sekunde|minuten|minute|stunden|stunde)"
}
},
"stop": {
"keywords": ["stopp", "stoppe", "beende"],
"required_slots": {}
},
"status": {
"keywords": ["status", "läuft", "noch"],
"required_slots": {}
}
}
}
}
# evtl mit regex überarbeiten
def detect_intent(text):
text = text.lower()
for name, data in INTENTS.items():
if any(word in text for word in data["keywords"]):
return name
return None
# =========================
# SKILLS
# =========================
from weather_jetzt import get_weather_for_location
def weather_skill(slots):
location = slots["location"]
result = asyncio.run(get_weather_for_location(location))
if result:
return f"Aktuell sind es in {result['location']} {result['temperatur']} Grad und die Wetterlage sieht {result['wetterlage']} aus."
else:
return f"Keine Wetterdaten verfügbar"
#return f"Das Wetter in {location} ist sonnig bei 20 Grad."
def start_timer_skill(slots):
duration = slots["duration"]
return f"Der Timer für {duration} Minuten wurde gestartet."
def stopp_timer_skill(slots):
return f"Timer wurde gestoppt."
def status_timer_skill(slots):
return f"Status Timer Ausgabe"
SKILLS = {
"weather": weather_skill,
"timer": {
"start": start_timer_skill,
"stop": stopp_timer_skill,
"status": status_timer_skill
}
}
# =========================
# DIALOGLOGIK
# =========================
def handle_text(text):
global context, state
if state != STATE_LISTENING:
return
print(f"[STT] {text}")
# 1. Intent erkennen
if not context["intent"]:
intent = detect_intent(text)
if not intent:
speak("Das habe ich nicht verstanden.")
reset_context()
return
context["intent"] = intent
##NOCHMAL GENAUER ERKLÄREN LASSEN (instatt in check reqired() nach required slots check nach open nach oben verschoben damit required slots funktoinieren )
intent_data = INTENTS[context["intent"]]
actions = intent_data.get("actions")
if actions:
for action_name, action_data in actions.items():
if any(k in text for k in action_data.get("keywords", [])):
context["action"] = action_name
break
#Edgecase falls nutzer befehl bei dem action benötigt wird ohne action angibt
if INTENTS[context["intent"]].get("actions") and context["action"] is None:
speak("Ungültige Eingabe, Aktion wurde nicht genannt")
return False
if context["action"] == None:
context["required_slots"] = INTENTS[context["intent"]]["required_slots"]
else:
context["required_slots"] = INTENTS[context["intent"]]["actions"][context["action"]]["required_slots"]
# 2. Fehlende Slots prüfen
if not check_required(text):
return
# 4. Skill ausführen
if context["action"] == None:
result = SKILLS[context["intent"]](context["slots"])
else:
result = SKILLS[context["intent"]][context["action"]](context["slots"])
speak(result)
reset_context()
def check_required(text):
intent_data = INTENTS[context["intent"]]
text = text.lower()
#for slot, pattern in intent_data.get("required_slots", {}).items():
for slot, pattern in context["required_slots"].items():
if slot not in context["slots"]:
match = re.search(pattern, text)
if match:
context["slots"][slot] = match.group(1) # schau an
else:
context["pending_slot"] = slot
ask_for_slot(slot)
return False
#Edgecase falls nutzer befehl bei dem action benötigt wird ohne action angibt
# if INTENTS[context["intent"]].get("actions") and context["action"] is None:
# speak("Ungültige Eingabe, Aktion wurde nicht genannt")
# return False
context["pending_slot"] = None
return True
def ask_for_slot(slot):
questions = {
"location": "Für welchen Ort?",
"duration": "Wie lange soll der Timer laufen?"
}
speak(questions.get(slot, "Bitte spezifizieren."))
def reset_context():
global context, state
context = {
"intent": None,
"slots": {},
"required_slots": [],
"pending_slot": None,
"action": None
}
state = STATE_IDLE
# =========================
# VOSK LISTENER
# =========================
def vosk_listener():
SAMPLE_RATE_VOSK = 16000
from vosk import Model, KaldiRecognizer
import pyaudio
model = Model(VOSK_MODEL_PATH)
rec = KaldiRecognizer(model, SAMPLE_RATE_VOSK)
p = pyaudio.PyAudio()
stream = p.open(
format=pyaudio.paInt16,
channels=1,
rate=SAMPLE_RATE_VOSK,
input=True,
frames_per_buffer=4000
)
# stream.start_stream()
while True:
if state != STATE_SPEAKING: #hinzugefügt um fehlerhafte eingaben zu stoppen
data = stream.read(4000, exception_on_overflow=False)
if rec.AcceptWaveform(data):
result = json.loads(rec.Result())
text = result.get("text", "")
if text:
audio_queue.put(text)
else:
rec.Reset()
# =========================
# WAKEWORD (SIMPLIFIZIERT)
# =========================
"""
def fake_wakeword_detector():
global state
while True:
if state == STATE_IDLE:
time.sleep(0.1)
state = STATE_LISTENING
speak("Wie kann ich helfen?")
"""
# ==========================
# WAKEWORD (PORCUPINE)
# ==========================
def real_wakeword_detector():
import pvporcupine
import numpy as np
global state
ACCESS_KEY = "lpz+8e9omUnQtCQPeaawZauxVRqdhbcDH3fz19oZsp7zXKflWCiYMw=="
WAKEWORD = "jarvis" # built-in wake word
porcupine = pvporcupine.create(
access_key=ACCESS_KEY,
keywords=[WAKEWORD]
)
if state == STATE_IDLE:
def callback(indata, frames, time_info, status):
pcm = np.frombuffer(indata, dtype=np.int16)
result = porcupine.process(pcm)
if result >= 0:
time.sleep(1) #verbesserung der spracheingabe: wurde hinzugefügt weil es sonst worte halluziniert (wie "eine", "jarvis")
state = STATE_LISTENING
print("WAKE WORD DETECTED")
#speak("Ja, wie kann ich helfen?")
speak("Ja?")
with sd.InputStream(
samplerate=porcupine.sample_rate,
channels=1,
dtype="int16",
blocksize=porcupine.frame_length,
callback=callback,
):
print("Listening...")
while True:
pass
# =========================
# MAIN LOOP
# =========================
def main():
threading.Thread(target=vosk_listener, daemon=True).start()
# threading.Thread(target=fake_wakeword_detector, daemon=True).start()
threading.Thread(target=real_wakeword_detector, daemon=True).start()
while True:
try:
text = audio_queue.get(timeout=0.1)
handle_text(text)
except queue.Empty:
pass
if __name__ == "__main__":
main()