From 9c5a87e61d3383ae3cb298610381feea71dda721 Mon Sep 17 00:00:00 2001 From: void Date: Sun, 18 Jan 2026 02:46:21 +0100 Subject: [PATCH] First Commit --- main.py | 269 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 269 insertions(+) create mode 100644 main.py diff --git a/main.py b/main.py new file mode 100644 index 0000000..8413509 --- /dev/null +++ b/main.py @@ -0,0 +1,269 @@ +import threading +import queue +import json +import time +import os +import subprocess +import sounddevice as sd + +#test + +# ========================= +# KONFIGURATION +# ========================= + +VOSK_MODEL_PATH = "/home/tino/Documents/_Documents/Schule/4 Wilhelm Maybach Schule/2BKI Jahr 2/Abschlussprojekt/test assistant/assistant_all_in_one/vosk-model-de-0.21/" +PIPER_BIN = "piper" +PIPER_MODEL = "de_DE-thorsten-medium.onnx" +SAMPLE_RATE = 22050 + +# ========================= +# STATES +# ========================= + +STATE_IDLE = "IDLE" +STATE_LISTENING = "LISTENING" +STATE_SPEAKING = "SPEAKING" + +# ========================= +# GLOBALER ZUSTAND +# ========================= + +state = STATE_IDLE +context = { + "intent": None, + "slots": {}, + "required_slots": [], + "pending_slot": None +} + +audio_queue = queue.Queue() + +# ========================= +# TTS (PIPER) +# ========================= + +def speak(text): + global state + state = STATE_SPEAKING + print(f"[TTS] {text}") + + process = subprocess.Popen( + [PIPER_BIN, "--model", PIPER_MODEL, "--output-raw"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE + ) + + audio = process.communicate(input=text.encode("utf-8"))[0] + + play = subprocess.Popen( + ["aplay", "-r", str(SAMPLE_RATE), "-f", "S16_LE"], + stdin=subprocess.PIPE + ) + play.communicate(audio) + + state = STATE_LISTENING + + +# ========================= +# INTENTS & SLOTS +# ========================= + +INTENTS = { + "weather": { + "keywords": ["wetter", "temperatur", "regen"], + "required_slots": ["location"] + }, + "timer": { + "keywords": ["timer"], + "required_slots": ["duration"] + } +} + +# evtl mit regex überarbeiten + +def detect_intent(text): + text = text.lower() + for name, data in INTENTS.items(): + if any(word in text for word in data["keywords"]): + return name + return None + + +# ========================= +# SKILLS +# ========================= + +def weather_skill(slots): + location = slots["location"] + return f"Das Wetter in {location} ist sonnig bei 20 Grad." + +def timer_skill(slots): + duration = slots["duration"] + return f"Der Timer für {duration} Minuten wurde gestartet." + +SKILLS = { + "weather": weather_skill, + "timer": timer_skill +} + +# ========================= +# DIALOGLOGIK +# ========================= + +def handle_text(text): + global context, state + + if state != STATE_LISTENING: + return + + print(f"[STT] {text}") + + # 1. Rückfrage beantworten + if context["pending_slot"]: + context["slots"][context["pending_slot"]] = text + context["pending_slot"] = None + + # 2. Intent erkennen + if not context["intent"]: + intent = detect_intent(text) + if not intent: + speak("Das habe ich nicht verstanden.") + reset_context() + return + + context["intent"] = intent + context["required_slots"] = INTENTS[intent]["required_slots"] + + # 3. Fehlende Slots prüfen + for slot in context["required_slots"]: + if slot not in context["slots"]: + context["pending_slot"] = slot + ask_for_slot(slot) + return + + # 4. Skill ausführen + result = SKILLS[context["intent"]](context["slots"]) + speak(result) + reset_context() + + +def ask_for_slot(slot): + questions = { + "location": "Für welchen Ort?", + "duration": "Wie lange soll der Timer laufen?" + } + speak(questions.get(slot, "Bitte spezifizieren.")) + + +def reset_context(): + global context, state + context = { + "intent": None, + "slots": {}, + "required_slots": [], + "pending_slot": None + } + state = STATE_IDLE + + +# ========================= +# VOSK LISTENER +# ========================= + +def vosk_listener(): + SAMPLE_RATE_VOSK = 16000 + from vosk import Model, KaldiRecognizer + import pyaudio + + model = Model(VOSK_MODEL_PATH) + rec = KaldiRecognizer(model, SAMPLE_RATE_VOSK) + + p = pyaudio.PyAudio() + stream = p.open( + format=pyaudio.paInt16, + channels=1, + rate=SAMPLE_RATE_VOSK, + input=True, + frames_per_buffer=4000 + ) +# stream.start_stream() + + while True: + data = stream.read(4000, exception_on_overflow=False) + if rec.AcceptWaveform(data): + result = json.loads(rec.Result()) + text = result.get("text", "") + if text: + audio_queue.put(text) + + +# ========================= +# WAKEWORD (SIMPLIFIZIERT) +# ========================= + +def fake_wakeword_detector(): + global state + while True: + if state == STATE_IDLE: + time.sleep(0.1) + state = STATE_LISTENING + speak("Wie kann ich helfen?") + +# ========================== +# WAKEWORD (PORCUPINE) +# ========================== +def real_wakeword_detector(): + import pvporcupine + import numpy as np + + global state + + ACCESS_KEY = "lpz+8e9omUnQtCQPeaawZauxVRqdhbcDH3fz19oZsp7zXKflWCiYMw==" + WAKEWORD = "jarvis" # built-in wake word + + porcupine = pvporcupine.create( + access_key=ACCESS_KEY, + keywords=[WAKEWORD] + ) + if state == STATE_IDLE: + + def callback(indata, frames, time, status): + pcm = np.frombuffer(indata, dtype=np.int16) + result = porcupine.process(pcm) + if result >= 0: + state = STATE_LISTENING + print("WAKE WORD DETECTED") + speak("Ja, wie kann ich helfen?") + + with sd.InputStream( + samplerate=porcupine.sample_rate, + channels=1, + dtype="int16", + blocksize=porcupine.frame_length, + callback=callback, + ): + print("Listening...") + while True: + pass + + +# ========================= +# MAIN LOOP +# ========================= + +def main(): + threading.Thread(target=vosk_listener, daemon=True).start() +# threading.Thread(target=fake_wakeword_detector, daemon=True).start() + threading.Thread(target=real_wakeword_detector, daemon=True).start() + + while True: + try: + text = audio_queue.get(timeout=0.1) + handle_text(text) + except queue.Empty: + pass + + +if __name__ == "__main__": + main()