import threading import queue import json import time import os import subprocess import sounddevice as sd #test # ========================= # KONFIGURATION # ========================= VOSK_MODEL_PATH = "/home/tino/Documents/_Documents/Schule/4 Wilhelm Maybach Schule/2BKI Jahr 2/Abschlussprojekt/test assistant/assistant_all_in_one/vosk-model-de-0.21/" PIPER_BIN = "piper" PIPER_MODEL = "de_DE-thorsten-medium.onnx" SAMPLE_RATE = 22050 # ========================= # STATES # ========================= STATE_IDLE = "IDLE" STATE_LISTENING = "LISTENING" STATE_SPEAKING = "SPEAKING" # ========================= # GLOBALER ZUSTAND # ========================= state = STATE_IDLE context = { "intent": None, "slots": {}, "required_slots": [], "pending_slot": None } audio_queue = queue.Queue() # ========================= # TTS (PIPER) # ========================= def speak(text): global state state = STATE_SPEAKING print(f"[TTS] {text}") process = subprocess.Popen( [PIPER_BIN, "--model", PIPER_MODEL, "--output-raw"], stdin=subprocess.PIPE, stdout=subprocess.PIPE ) audio = process.communicate(input=text.encode("utf-8"))[0] play = subprocess.Popen( ["aplay", "-r", str(SAMPLE_RATE), "-f", "S16_LE"], stdin=subprocess.PIPE ) play.communicate(audio) state = STATE_LISTENING # ========================= # INTENTS & SLOTS # ========================= INTENTS = { "weather": { "keywords": ["wetter", "temperatur", "regen"], "required_slots": ["location"] }, "timer": { "keywords": ["timer"], "required_slots": ["duration"] } } # evtl mit regex überarbeiten def detect_intent(text): text = text.lower() for name, data in INTENTS.items(): if any(word in text for word in data["keywords"]): return name return None # ========================= # SKILLS # ========================= def weather_skill(slots): location = slots["location"] return f"Das Wetter in {location} ist sonnig bei 20 Grad." def timer_skill(slots): duration = slots["duration"] return f"Der Timer für {duration} Minuten wurde gestartet." SKILLS = { "weather": weather_skill, "timer": timer_skill } # ========================= # DIALOGLOGIK # ========================= def handle_text(text): global context, state if state != STATE_LISTENING: return print(f"[STT] {text}") # 1. Rückfrage beantworten if context["pending_slot"]: context["slots"][context["pending_slot"]] = text context["pending_slot"] = None # 2. Intent erkennen if not context["intent"]: intent = detect_intent(text) if not intent: speak("Das habe ich nicht verstanden.") reset_context() return context["intent"] = intent context["required_slots"] = INTENTS[intent]["required_slots"] # 3. Fehlende Slots prüfen for slot in context["required_slots"]: if slot not in context["slots"]: context["pending_slot"] = slot ask_for_slot(slot) return # 4. Skill ausführen result = SKILLS[context["intent"]](context["slots"]) speak(result) reset_context() def ask_for_slot(slot): questions = { "location": "Für welchen Ort?", "duration": "Wie lange soll der Timer laufen?" } speak(questions.get(slot, "Bitte spezifizieren.")) def reset_context(): global context, state context = { "intent": None, "slots": {}, "required_slots": [], "pending_slot": None } state = STATE_IDLE # ========================= # VOSK LISTENER # ========================= def vosk_listener(): SAMPLE_RATE_VOSK = 16000 from vosk import Model, KaldiRecognizer import pyaudio model = Model(VOSK_MODEL_PATH) rec = KaldiRecognizer(model, SAMPLE_RATE_VOSK) p = pyaudio.PyAudio() stream = p.open( format=pyaudio.paInt16, channels=1, rate=SAMPLE_RATE_VOSK, input=True, frames_per_buffer=4000 ) # stream.start_stream() while True: data = stream.read(4000, exception_on_overflow=False) if rec.AcceptWaveform(data): result = json.loads(rec.Result()) text = result.get("text", "") if text: audio_queue.put(text) # ========================= # WAKEWORD (SIMPLIFIZIERT) # ========================= def fake_wakeword_detector(): global state while True: if state == STATE_IDLE: time.sleep(0.1) state = STATE_LISTENING speak("Wie kann ich helfen?") # ========================== # WAKEWORD (PORCUPINE) # ========================== def real_wakeword_detector(): import pvporcupine import numpy as np global state ACCESS_KEY = "lpz+8e9omUnQtCQPeaawZauxVRqdhbcDH3fz19oZsp7zXKflWCiYMw==" WAKEWORD = "jarvis" # built-in wake word porcupine = pvporcupine.create( access_key=ACCESS_KEY, keywords=[WAKEWORD] ) if state == STATE_IDLE: def callback(indata, frames, time, status): pcm = np.frombuffer(indata, dtype=np.int16) result = porcupine.process(pcm) if result >= 0: state = STATE_LISTENING print("WAKE WORD DETECTED") speak("Ja, wie kann ich helfen?") with sd.InputStream( samplerate=porcupine.sample_rate, channels=1, dtype="int16", blocksize=porcupine.frame_length, callback=callback, ): print("Listening...") while True: pass # ========================= # MAIN LOOP # ========================= def main(): threading.Thread(target=vosk_listener, daemon=True).start() # threading.Thread(target=fake_wakeword_detector, daemon=True).start() threading.Thread(target=real_wakeword_detector, daemon=True).start() while True: try: text = audio_queue.get(timeout=0.1) handle_text(text) except queue.Empty: pass if __name__ == "__main__": main()