Files
RasPi_Voice_Assistant--WIP/main.py

623 lines
18 KiB
Python

import threading
import queue
import json
import time
import os
import subprocess
import sounddevice as sd
import re
import asyncio
from weather_jetzt import get_weather_for_location
from timer_control import parse_time, start_timer, stop_timer, timer_status_info, format_duration
from wecker_control import parse_time_wecker, minutes_until, start_wecker, stop_wecker, wecker_status_info, calculate_target_datetime
from music_control import MusicPlayer
music_player = MusicPlayer()
music_should_resume = False
music_was_playing = False
# =========================
# KONFIGURATION
# =========================
VOSK_MODEL_PATH = "/home/tino/Desktop/Abschlussprojekt/test assistant/cloneAssistantAllInOne/vosk-model-de-0.21/"
PIPER_BIN = "piper"
PIPER_MODEL = "de_DE-thorsten-medium.onnx"
SAMPLE_RATE = 22050
# =========================
# STATES
# =========================
STATE_IDLE = "IDLE"
STATE_LISTENING = "LISTENING"
STATE_SPEAKING = "SPEAKING"
# =========================
# GLOBALER ZUSTAND
# =========================
state = STATE_IDLE
context = {
"intent": None,
"slots": {},
"required_slots": [],
"pending_slot": None,
"action": None
}
audio_queue = queue.Queue()
# =========================
# TTS (PIPER)
# =========================
def speak(text):
global state
state = STATE_SPEAKING
print(f"[TTS] {text}")
process = subprocess.Popen(
[PIPER_BIN, "--model", PIPER_MODEL, "--output-raw"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE
)
audio = process.communicate(input=text.encode("utf-8"))[0]
play = subprocess.Popen(
["aplay", "-r", str(SAMPLE_RATE), "-f", "S16_LE"],
stdin=subprocess.PIPE
)
play.communicate(audio)
state = STATE_LISTENING
# =========================
# INTENTS & SLOTS
# =========================
INTENTS = {
"standard_info": {
"keywords": ["was", "kannst", "welche"],
"required_slots": {}
},
"erweiterte_info": {
"keywords": ["info", "infos", "funktion", "erkläre"],
"required_slots": {
"selection": r"(list|wetter|timer|wecker|musik)"
}
},
"weather": {
"keywords": ["wetter", "temperatur", "regen"],
"required_slots": {
#"location": r"\bin\b\s*(\w+)"
"location": r"\bin\b\s*([^.,!?]+)"
},
"subactions": ["info"]
},
"timer": {
"keywords": ["timer"],
# "required_slots": {},
"actions":{
"start": {
"keywords": ["starte", "start", "beginne", "stelle"],
"required_slots": {
# "duration": r"(\w+)\s*(sekunden|sekunde|minuten|minute|stunden|stunde)"
"duration": r"((?:\w+)\s*(?:sekunden|sekunde|minuten|minute|stunden|stunde))"
},
},
"stop": {
"keywords": ["stopp", "stoppe", "beende"],
"required_slots": {}
},
"status": {
"keywords": ["status", "läuft", "noch"],
"required_slots": {}
}
}
},
"wecker": {
"keywords": ["wecker", "timer"],
"actions": {
"start": {
"keywords": ["erstelle", "stelle"],
"required_slots": {
"timeSet": r"\b([\w]+)\s*uhr(?:\s*([\w]+))?\b"
},
},
"stop": {
"keywords": ["stopp", "stoppe", "entferne"],
"required_slots": {}
},
"status": {
"keywords": ["status", "läuft", "noch"],
"required_slots": {}
}
},
},
"music": {
"keywords": ["musik", "music", "lied", "songs", "song"],
"actions": {
"play": {
"keywords": ["spiele", "spiel", "starte", "hören"],
"required_slots": {
"genre": r"(jazz|rock|pop)"
}
},
"pause": {
"keywords": ["pausiere", "pause"],
"required_slots": {}
},
"resume": {
"keywords": ["setze", "fort"],
"required_slots": {}
},
"next": {
"keywords": ["nächster", "nächsten", "nächsten", "weiter"],
"required_slots": {}
},
"previous": {
"keywords": ["vorheriger", "vorherigen", "zurück"],
"required_slots": {}
},
"stop": {
"keywords": ["stop", "stoppe", "beende"],
"required_slots": {}
}
}
}
}
def detect_intent(text):
text = text.lower()
for name, data in INTENTS.items():
if any(word in text for word in data["keywords"]):
return name
return None
# =========================
# SKILLS
# =========================
## WEATHER
def weather_skill(slots):
location = slots["location"]
location = re.sub(r"\b(heute|morgen|bitte|jetzt|aus)\b", "", location)
result = asyncio.run(get_weather_for_location(location))
# if result['location'] != slots["location"]:
# return f"Keine Wetterdaten für {location} gefunden"
if result:
return f"Aktuell sind es in {slots['location']} {result['temperatur']} Grad und die Wetterlage sieht {result['wetterlage']} aus."
else:
return f"Keine Wetterdaten für {location} verfügbar"
#return f"Das Wetter in {location} ist sonnig bei 20 Grad."
def einf_info_skill(slots):
global state
state == STATE_SPEAKING
return f"Ich bin Tscharvis, dein Persönlicher Sprachassistent. Ich kann dir sagen wie das wetter wird, einen Timer oder einen Wecker stellen und Musik verschiedener Schonre abspielen. Aktivieren tut man mich indem man, Hey Tscharvis, sagt und anschließend seinen Wunsch tätigt. Hast du Fragen zu den Funktionen? Dann schieß los. War das zu schnell? Dann kann ich dir die Funktionen auch nochmal auflisten"
def erw_info_skill(slots):
sel = slots["selection"]
if sel == "list":
return "Meine integrierten Funktionen sind: ein Wetterbericht, ein Timer, einen Wecker und die Musikwiedergabe."
elif sel == "wetter":
return "Für den Wetterbericht wird nach dem Wetter gefragt mit einer Ortsangabe. Zum Beispiel: wie wird das Wetter in Heilbronn??"
elif sel == "timer":
return "Bei dem Timer gibt es 3 verschiedene Funktionen: Das Stellen eines Timers, das Stoppen eines Timers und den Status des aktuellen Timers erfragen."
elif sel == "wecker":
return "Der Wecker hat 3 Funktionen: Wecker mit Angabe einer Uhrzeit stellen, den Wecker stoppen und den Status erfragen."
elif sel == "musik":
return "Der Musikplayer ist die größte Funktion, es kann: Musik nach Schonre abgespielt werden, sobalt die Musik am laufen ist, kann sie pausiert und fortgesetzt werden. Man kann zum nächsten und vorherigen Song springen und die Wiedergabe vollständig beenden"
else:
return "Dazu habe ich leider keine Informationen."
# if slots["selection"] == "list":
# def list_funktionen(slots):
# return f"Meine Integrierten Funktionen lauten: Wetterbericht, Timer, Wecker, Musik"
# elif slots["selection"] == "wetter":
# def info_wetter(slots):
# return f"Für den Wetterbericht wird nach dem Wetter gefragt mit einer Ortsangabe. Zum Beispiel: wie wird das Wetter in Heilbronn?"
# elif slots["selection"] == "timer":
# def info_timer(slots):
# return f"Bei dem Timer gibt es 3 verschiedene Funktionen: Das Stellen eines Timers, das Stoppen eines Timers und den Status des aktuellen Timers erfragen."
# elif slots["selection"] == "wecker":
# def info_wecker(slots):
# return f"Der Wecker hat 3 Funktionen: Wecker mit Angabe einer Uhrzeit stellen, den Wecker stoppen und den Status erfragen."
# elif slots["selection"] == "musik":
# def musik_info(slot):
# return f"Der Musikplayer ist die größte Funktion, es kann: Musik nach Schonre abgespielt werden, sobalt die Musik am laufen ist, kann sie pausiert und forgesetzt werden. Man kann zum nächsten und vorherigen Song springen und die Wiedergabe vollständig beenden"
## TIMER
def start_timer_skill(slots):
duration = slots["duration"]
seconds = parse_time(duration)
if seconds:
start_timer(seconds)
return f"Timer gestartet"
# return f"Der Timer für {duration} wurde gestartet."
def stopp_timer_skill(slots):
#return f"Timer wurde gestoppt."
stop_timer()
return f"Timer wurde gestoppt"
def status_timer_skill(slots):
# remaining = timer_status_info()["remaining"]
# return f"Status Timer Ausgabe {remaining}"
info = timer_status_info()
if info["status"] == "running":
remaining = info["remaining"]
return f"Der Timer läuft noch {format_duration(remaining)} "
elif info["status"] == "finished":
return f"Der Timer ist abgelaufen"
elif info["status"] == "stopped":
return f"Der Timer wurde gestoppt"
else:
return f"Es läuft kein Timer"
#ALARM
def start_wecker_skill(slots):
parsed = parse_time_wecker(slots["timeSet"])
if not parsed:
return "Die Uhrzeit konnte nicht erkannt werden."
hour, minute = parsed
target = calculate_target_datetime(hour, minute)
start_wecker(target)
minutes = minutes_until(target)
return f"Wecker für {hour:02d}:{minute:02d} wurde gestellt er klingelt in {minutes} minuten"
def stopp_wecker_skill(slots):
stop_wecker()
print("Wecker wurde gestoppt")
return f"Wecker wurde gestoppt"
def status_wecker_skill(slots):
info = wecker_status_info()
if info["status"] == "running":
target_time = info["target_time"]
return f"Ein Wecker wurde bereits auf {wecker_target_time} gestellt"
elif info["status"] == "finished":
return f"Der Wecker ist abgelaufen"
elif info["status"] == "stopped":
return f"Der Wecker wurde gestoppt, es ist kein Wecker gestellt"
else:
return f"Es ist kein Wecker gestellt"
def play_music_skill(slots):
genre = slots["genre"]
if not music_player.load_genre(genre):
return f"Keine Musik im Genre {genre} gefunden"
speak(f"{genre} Musik wird abgespielt")
music_player.play()
return ""
def pause_music_skill(slots):
music_player.pause()
return "Musik pausiert."
def resume_music_skill(slots):
music_player.resume()
return "Musik fortgesetzt"
def next_music_skill(slots):
music_player.next_song()
return "Nächster Song."
def previous_music_skill(slots):
music_player.previous_song()
return "Vorheriger Song."
def stop_music_skill(slots):
music_player.stop()
return "Musik wurde vollständig gestoppt."
SKILLS = {
"weather": weather_skill,
"timer": {
"start": start_timer_skill,
"stop": stopp_timer_skill,
"status": status_timer_skill,
},
"standard_info": einf_info_skill,
"erweiterte_info": erw_info_skill,
"wecker": {
"start": start_wecker_skill,
"stop": stopp_wecker_skill,
"status": status_wecker_skill,
},
"music": {
"play": play_music_skill,
"pause": pause_music_skill,
"resume": resume_music_skill,
"next": next_music_skill,
"previous": previous_music_skill,
"stop": stop_music_skill
}
}
# =========================
# DIALOGLOGIK
# =========================
def handle_text(text):
global context, state, music_was_playing
if state != STATE_LISTENING:
return
print(f"[STT] {text}")
# 1. Intent erkennen
if not context["intent"]:
intent = detect_intent(text)
if not intent:
speak("Das habe ich nicht verstanden.")
if music_was_playing:
music_player.play()
music_was_playing = False
reset_context()
state = STATE_LISTENING
return
context["intent"] = intent
##NOCHMAL GENAUER ERKLÄREN LASSEN (instatt in check reqired() nach required slots check nach open nach oben verschoben damit required slots funktoinieren )
intent_data = INTENTS[context["intent"]]
actions = intent_data.get("actions")
if actions:
for action_name, action_data in actions.items():
if any(k in text for k in action_data.get("keywords", [])):
context["action"] = action_name
break
#Edgecase falls nutzer befehl bei dem action benötigt wird ohne action angibt
if INTENTS[context["intent"]].get("actions") and context["action"] is None:
speak("Ungültige Eingabe, Aktion wurde nicht genannt")
return False
if context["action"] == None:
context["required_slots"] = INTENTS[context["intent"]]["required_slots"]
else:
context["required_slots"] = INTENTS[context["intent"]]["actions"][context["action"]]["required_slots"]
# 2. Fehlende Slots prüfen
if not check_required(text):
return
# 4. Skill ausführen
if context["action"] == None:
result = SKILLS[context["intent"]](context["slots"])
else:
result = SKILLS[context["intent"]][context["action"]](context["slots"])
speak(result)
reset_context()
def check_required(text):
intent_data = INTENTS[context["intent"]]
text = text.lower()
#for slot, pattern in intent_data.get("required_slots", {}).items():
for slot, pattern in context["required_slots"].items():
if slot not in context["slots"]:
match = re.search(pattern, text)
if match:
if slot == "timeSet":
context["slots"][slot] = match.group(0)
else:
context["slots"][slot] = match.group(1) #alles slots
else:
context["pending_slot"] = slot
ask_for_slot(slot)
return False
#Edgecase falls nutzer befehl bei dem action benötigt wird ohne action angibt
# if INTENTS[context["intent"]].get("actions") and context["action"] is None:
# speak("Ungültige Eingabe, Aktion wurde nicht genannt")
# return False
context["pending_slot"] = None
return True
def ask_for_slot(slot):
questions = {
"location": "Für welchen Ort?",
"duration": "Wie lange soll der Timer laufen?",
"timeSet": "Zu welcher Uhrzeit soll der Wecker klingeln?",
"genre": "Welches Musikgenre möchtest du hören?",
"selection": "Zu welcher Funktoin möchtest du mehr wissen?"
}
speak(questions.get(slot, "Bitte spezifizieren."))
def reset_context():
global context, state
context = {
"intent": None,
"slots": {},
"required_slots": [],
"pending_slot": None,
"action": None
}
state = STATE_IDLE
# state = STATE_LISTENING manuell
# =========================
# VOSK LISTENER
# =========================
def vosk_listener():
SAMPLE_RATE_VOSK = 16000
from vosk import Model, KaldiRecognizer
import pyaudio
model = Model(VOSK_MODEL_PATH)
rec = KaldiRecognizer(model, SAMPLE_RATE_VOSK)
p = pyaudio.PyAudio()
stream = p.open(
format=pyaudio.paInt16,
channels=1,
rate=SAMPLE_RATE_VOSK,
input=True,
frames_per_buffer=4000
)
# stream.start_stream()
while True:
if state == STATE_LISTENING: #hinzugefügt um fehlerhafte eingaben zu stoppen
data = stream.read(4000, exception_on_overflow=False)
if rec.AcceptWaveform(data):
result = json.loads(rec.Result())
text = result.get("text", "")
if text:
audio_queue.put(text)
else:
rec.Reset()
# =========================
# WAKEWORD (SIMPLIFIZIERT)
# =========================
"""
def fake_wakeword_detector():
global state
while True:
if state == STATE_IDLE:
time.sleep(0.1)
state = STATE_LISTENING
speak("Wie kann ich helfen?")
"""
# ==========================
# WAKEWORD (PORCUPINE)
# ==========================
def real_wakeword_detector():
import pvporcupine
import numpy as np
global state
ACCESS_KEY = "lpz+8e9omUnQtCQPeaawZauxVRqdhbcDH3fz19oZsp7zXKflWCiYMw=="
WAKEWORD = "jarvis" # built-in wake word
porcupine = pvporcupine.create(
access_key=ACCESS_KEY,
keywords=[WAKEWORD]
)
if state == STATE_IDLE and state != STATE_SPEAKING:
def callback(indata, frames, time_info, status):
pcm = np.frombuffer(indata, dtype=np.int16)
result = porcupine.process(pcm)
if result >= 0:
#FUNKTOIN FÜR MUSIKPLAYER
if music_player.playing:
music_was_playing = True
music_player.pause()
else:
music_was_playing = False
time.sleep(1) #verbesserung der spracheingabe: wurde hinzugefügt weil es sonst worte halluziniert (wie "eine", "jarvis")
state = STATE_LISTENING
print("WAKE WORD DETECTED")
#speak("Ja, wie kann ich helfen?")
speak("Ja?")
with sd.InputStream(
samplerate=porcupine.sample_rate,
channels=1,
dtype="int16",
blocksize=porcupine.frame_length,
callback=callback,
):
print("Listening...")
while True:
pass
# =========================
# MAIN LOOP
# =========================
def main():
threading.Thread(target=vosk_listener, daemon=True).start()
# threading.Thread(target=fake_wakeword_detector, daemon=True).start()
threading.Thread(target=real_wakeword_detector, daemon=True).start()
while True:
try:
text = audio_queue.get(timeout=0.1)
handle_text(text)
except queue.Empty:
pass
#main manuelle input
""" def main():
global state
state = STATE_LISTENING
while True:
text = input("Text input: ")
handle_text(text)
"""
if __name__ == "__main__":
main()