Compare commits

...

5 Commits

20 changed files with 612 additions and 86 deletions

Binary file not shown.

Binary file not shown.

268
main.py
View File

@@ -7,10 +7,17 @@ import subprocess
import sounddevice as sd import sounddevice as sd
import re import re
import asyncio import asyncio
from weather_jetzt import get_weather_for_location from weather_jetzt import get_weather_for_location
from timer_control import parse_time, start_timer, stop_timer, timer_status_info, format_duration from timer_control import parse_time, start_timer, stop_timer, timer_status_info, format_duration
from wecker_control import parse_time_wecker, minutes_until, start_wecker, stop_wecker, wecker_status_info, calculate_target_datetime
from music_control import MusicPlayer
music_player = MusicPlayer()
music_should_resume = False
music_was_playing = False
#test
# ========================= # =========================
# KONFIGURATION # KONFIGURATION
@@ -29,6 +36,8 @@ STATE_IDLE = "IDLE"
STATE_LISTENING = "LISTENING" STATE_LISTENING = "LISTENING"
STATE_SPEAKING = "SPEAKING" STATE_SPEAKING = "SPEAKING"
# ========================= # =========================
# GLOBALER ZUSTAND # GLOBALER ZUSTAND
# ========================= # =========================
@@ -75,14 +84,28 @@ def speak(text):
# ========================= # =========================
INTENTS = { INTENTS = {
"standard_info": {
"keywords": ["was", "kannst", "welche"],
"required_slots": {}
},
"erweiterte_info": {
"keywords": ["info", "infos", "funktion", "erkläre"],
"required_slots": {
"selection": r"(list|wetter|timer|wecker|musik)"
}
},
"weather": { "weather": {
"keywords": ["wetter", "temperatur", "regen"], "keywords": ["wetter", "temperatur", "regen"],
"required_slots": { "required_slots": {
"location": r"\bin\b\s*(\w+)" #"location": r"\bin\b\s*(\w+)"
"location": r"\bin\b\s*([^.,!?]+)"
}, },
"subactions": ["info"] "subactions": ["info"]
}, },
"timer": { "timer": {
"keywords": ["timer"], "keywords": ["timer"],
# "required_slots": {}, # "required_slots": {},
@@ -93,7 +116,7 @@ INTENTS = {
# "duration": r"(\w+)\s*(sekunden|sekunde|minuten|minute|stunden|stunde)" # "duration": r"(\w+)\s*(sekunden|sekunde|minuten|minute|stunden|stunde)"
"duration": r"((?:\w+)\s*(?:sekunden|sekunde|minuten|minute|stunden|stunde))" "duration": r"((?:\w+)\s*(?:sekunden|sekunde|minuten|minute|stunden|stunde))"
} },
}, },
"stop": { "stop": {
"keywords": ["stopp", "stoppe", "beende"], "keywords": ["stopp", "stoppe", "beende"],
@@ -104,11 +127,59 @@ INTENTS = {
"required_slots": {} "required_slots": {}
} }
} }
},
"wecker": {
"keywords": ["wecker", "timer"],
"actions": {
"start": {
"keywords": ["erstelle", "stelle"],
"required_slots": {
"timeSet": r"\b([\w]+)\s*uhr(?:\s*([\w]+))?\b"
},
},
"stop": {
"keywords": ["stopp", "stoppe", "entferne"],
"required_slots": {}
},
"status": {
"keywords": ["status", "läuft", "noch"],
"required_slots": {}
}
},
},
"music": {
"keywords": ["musik", "music", "lied", "songs", "song"],
"actions": {
"play": {
"keywords": ["spiele", "spiel", "starte", "hören"],
"required_slots": {
"genre": r"(jazz|rock|pop)"
}
},
"pause": {
"keywords": ["pausiere", "pause"],
"required_slots": {}
},
"resume": {
"keywords": ["setze", "fort"],
"required_slots": {}
},
"next": {
"keywords": ["nächster", "nächsten", "nächsten", "weiter"],
"required_slots": {}
},
"previous": {
"keywords": ["vorheriger", "vorherigen", "zurück"],
"required_slots": {}
},
"stop": {
"keywords": ["stop", "stoppe", "beende"],
"required_slots": {}
}
}
} }
} }
# evtl mit regex überarbeiten
def detect_intent(text): def detect_intent(text):
text = text.lower() text = text.lower()
for name, data in INTENTS.items(): for name, data in INTENTS.items():
@@ -122,17 +193,65 @@ def detect_intent(text):
# ========================= # =========================
## WEATHER
def weather_skill(slots): def weather_skill(slots):
location = slots["location"] location = slots["location"]
location = re.sub(r"\b(heute|morgen|bitte|jetzt|aus)\b", "", location)
result = asyncio.run(get_weather_for_location(location)) result = asyncio.run(get_weather_for_location(location))
# if result['location'] != slots["location"]:
# return f"Keine Wetterdaten für {location} gefunden"
if result: if result:
return f"Aktuell sind es in {result['location']} {result['temperatur']} Grad und die Wetterlage sieht {result['wetterlage']} aus." return f"Aktuell sind es in {slots['location']} {result['temperatur']} Grad und die Wetterlage sieht {result['wetterlage']} aus."
else: else:
return f"Keine Wetterdaten verfügbar" return f"Keine Wetterdaten für {location} verfügbar"
#return f"Das Wetter in {location} ist sonnig bei 20 Grad." #return f"Das Wetter in {location} ist sonnig bei 20 Grad."
def einf_info_skill(slots):
global state
state == STATE_SPEAKING
return f"Ich bin Tscharvis, dein Persönlicher Sprachassistent. Ich kann dir sagen wie das wetter wird, einen Timer oder einen Wecker stellen und Musik verschiedener Schonre abspielen. Aktivieren tut man mich indem man, Hey Tscharvis, sagt und anschließend seinen Wunsch tätigt. Hast du Fragen zu den Funktionen? Dann schieß los. War das zu schnell? Dann kann ich dir die Funktionen auch nochmal auflisten"
def erw_info_skill(slots):
sel = slots["selection"]
if sel == "list":
return "Meine integrierten Funktionen sind: ein Wetterbericht, ein Timer, einen Wecker und die Musikwiedergabe."
elif sel == "wetter":
return "Für den Wetterbericht wird nach dem Wetter gefragt mit einer Ortsangabe. Zum Beispiel: wie wird das Wetter in Heilbronn??"
elif sel == "timer":
return "Bei dem Timer gibt es 3 verschiedene Funktionen: Das Stellen eines Timers, das Stoppen eines Timers und den Status des aktuellen Timers erfragen."
elif sel == "wecker":
return "Der Wecker hat 3 Funktionen: Wecker mit Angabe einer Uhrzeit stellen, den Wecker stoppen und den Status erfragen."
elif sel == "musik":
return "Der Musikplayer ist die größte Funktion, es kann: Musik nach Schonre abgespielt werden, sobalt die Musik am laufen ist, kann sie pausiert und fortgesetzt werden. Man kann zum nächsten und vorherigen Song springen und die Wiedergabe vollständig beenden"
else:
return "Dazu habe ich leider keine Informationen."
# if slots["selection"] == "list":
# def list_funktionen(slots):
# return f"Meine Integrierten Funktionen lauten: Wetterbericht, Timer, Wecker, Musik"
# elif slots["selection"] == "wetter":
# def info_wetter(slots):
# return f"Für den Wetterbericht wird nach dem Wetter gefragt mit einer Ortsangabe. Zum Beispiel: wie wird das Wetter in Heilbronn?"
# elif slots["selection"] == "timer":
# def info_timer(slots):
# return f"Bei dem Timer gibt es 3 verschiedene Funktionen: Das Stellen eines Timers, das Stoppen eines Timers und den Status des aktuellen Timers erfragen."
# elif slots["selection"] == "wecker":
# def info_wecker(slots):
# return f"Der Wecker hat 3 Funktionen: Wecker mit Angabe einer Uhrzeit stellen, den Wecker stoppen und den Status erfragen."
# elif slots["selection"] == "musik":
# def musik_info(slot):
# return f"Der Musikplayer ist die größte Funktion, es kann: Musik nach Schonre abgespielt werden, sobalt die Musik am laufen ist, kann sie pausiert und forgesetzt werden. Man kann zum nächsten und vorherigen Song springen und die Wiedergabe vollständig beenden"
## TIMER
def start_timer_skill(slots): def start_timer_skill(slots):
duration = slots["duration"] duration = slots["duration"]
seconds = parse_time(duration) seconds = parse_time(duration)
@@ -162,7 +281,71 @@ def status_timer_skill(slots):
else: else:
return f"Es läuft kein Timer" return f"Es läuft kein Timer"
#ALARM
def start_wecker_skill(slots):
parsed = parse_time_wecker(slots["timeSet"])
if not parsed:
return "Die Uhrzeit konnte nicht erkannt werden."
hour, minute = parsed
target = calculate_target_datetime(hour, minute)
start_wecker(target)
minutes = minutes_until(target)
return f"Wecker für {hour:02d}:{minute:02d} wurde gestellt er klingelt in {minutes} minuten"
def stopp_wecker_skill(slots):
stop_wecker()
print("Wecker wurde gestoppt")
return f"Wecker wurde gestoppt"
def status_wecker_skill(slots):
info = wecker_status_info()
if info["status"] == "running":
target_time = info["target_time"]
return f"Ein Wecker wurde bereits auf {wecker_target_time} gestellt"
elif info["status"] == "finished":
return f"Der Wecker ist abgelaufen"
elif info["status"] == "stopped":
return f"Der Wecker wurde gestoppt, es ist kein Wecker gestellt"
else:
return f"Es ist kein Wecker gestellt"
def play_music_skill(slots):
genre = slots["genre"]
if not music_player.load_genre(genre):
return f"Keine Musik im Genre {genre} gefunden"
speak(f"{genre} Musik wird abgespielt")
music_player.play()
return ""
def pause_music_skill(slots):
music_player.pause()
return "Musik pausiert."
def resume_music_skill(slots):
music_player.resume()
return "Musik fortgesetzt"
def next_music_skill(slots):
music_player.next_song()
return "Nächster Song."
def previous_music_skill(slots):
music_player.previous_song()
return "Vorheriger Song."
def stop_music_skill(slots):
music_player.stop()
return "Musik wurde vollständig gestoppt."
SKILLS = { SKILLS = {
@@ -170,7 +353,22 @@ SKILLS = {
"timer": { "timer": {
"start": start_timer_skill, "start": start_timer_skill,
"stop": stopp_timer_skill, "stop": stopp_timer_skill,
"status": status_timer_skill "status": status_timer_skill,
},
"standard_info": einf_info_skill,
"erweiterte_info": erw_info_skill,
"wecker": {
"start": start_wecker_skill,
"stop": stopp_wecker_skill,
"status": status_wecker_skill,
},
"music": {
"play": play_music_skill,
"pause": pause_music_skill,
"resume": resume_music_skill,
"next": next_music_skill,
"previous": previous_music_skill,
"stop": stop_music_skill
} }
} }
@@ -179,7 +377,7 @@ SKILLS = {
# ========================= # =========================
def handle_text(text): def handle_text(text):
global context, state global context, state, music_was_playing
if state != STATE_LISTENING: if state != STATE_LISTENING:
return return
@@ -191,7 +389,11 @@ def handle_text(text):
intent = detect_intent(text) intent = detect_intent(text)
if not intent: if not intent:
speak("Das habe ich nicht verstanden.") speak("Das habe ich nicht verstanden.")
if music_was_playing:
music_player.play()
music_was_playing = False
reset_context() reset_context()
state = STATE_LISTENING
return return
context["intent"] = intent context["intent"] = intent
@@ -246,7 +448,11 @@ def check_required(text):
if slot not in context["slots"]: if slot not in context["slots"]:
match = re.search(pattern, text) match = re.search(pattern, text)
if match: if match:
context["slots"][slot] = match.group(1) # schau an if slot == "timeSet":
context["slots"][slot] = match.group(0)
else:
context["slots"][slot] = match.group(1) #alles slots
else: else:
context["pending_slot"] = slot context["pending_slot"] = slot
ask_for_slot(slot) ask_for_slot(slot)
@@ -268,7 +474,10 @@ def check_required(text):
def ask_for_slot(slot): def ask_for_slot(slot):
questions = { questions = {
"location": "Für welchen Ort?", "location": "Für welchen Ort?",
"duration": "Wie lange soll der Timer laufen?" "duration": "Wie lange soll der Timer laufen?",
"timeSet": "Zu welcher Uhrzeit soll der Wecker klingeln?",
"genre": "Welches Musikgenre möchtest du hören?",
"selection": "Zu welcher Funktoin möchtest du mehr wissen?"
} }
speak(questions.get(slot, "Bitte spezifizieren.")) speak(questions.get(slot, "Bitte spezifizieren."))
@@ -282,14 +491,14 @@ def reset_context():
"pending_slot": None, "pending_slot": None,
"action": None "action": None
} }
#state = STATE_IDLE state = STATE_IDLE
state = STATE_LISTENING # state = STATE_LISTENING manuell
# ========================= # =========================
# VOSK LISTENER # VOSK LISTENER
# ========================= # =========================
"""
def vosk_listener(): def vosk_listener():
SAMPLE_RATE_VOSK = 16000 SAMPLE_RATE_VOSK = 16000
from vosk import Model, KaldiRecognizer from vosk import Model, KaldiRecognizer
@@ -309,7 +518,7 @@ def vosk_listener():
# stream.start_stream() # stream.start_stream()
while True: while True:
if state != STATE_SPEAKING: #hinzugefügt um fehlerhafte eingaben zu stoppen if state == STATE_LISTENING: #hinzugefügt um fehlerhafte eingaben zu stoppen
data = stream.read(4000, exception_on_overflow=False) data = stream.read(4000, exception_on_overflow=False)
if rec.AcceptWaveform(data): if rec.AcceptWaveform(data):
result = json.loads(rec.Result()) result = json.loads(rec.Result())
@@ -319,7 +528,7 @@ def vosk_listener():
else: else:
rec.Reset() rec.Reset()
"""
# ========================= # =========================
# WAKEWORD (SIMPLIFIZIERT) # WAKEWORD (SIMPLIFIZIERT)
# ========================= # =========================
@@ -336,7 +545,7 @@ def fake_wakeword_detector():
# WAKEWORD (PORCUPINE) # WAKEWORD (PORCUPINE)
# ========================== # ==========================
"""
def real_wakeword_detector(): def real_wakeword_detector():
import pvporcupine import pvporcupine
import numpy as np import numpy as np
@@ -350,12 +559,20 @@ def real_wakeword_detector():
access_key=ACCESS_KEY, access_key=ACCESS_KEY,
keywords=[WAKEWORD] keywords=[WAKEWORD]
) )
if state == STATE_IDLE: if state == STATE_IDLE and state != STATE_SPEAKING:
def callback(indata, frames, time_info, status): def callback(indata, frames, time_info, status):
pcm = np.frombuffer(indata, dtype=np.int16) pcm = np.frombuffer(indata, dtype=np.int16)
result = porcupine.process(pcm) result = porcupine.process(pcm)
if result >= 0: if result >= 0:
#FUNKTOIN FÜR MUSIKPLAYER
if music_player.playing:
music_was_playing = True
music_player.pause()
else:
music_was_playing = False
time.sleep(1) #verbesserung der spracheingabe: wurde hinzugefügt weil es sonst worte halluziniert (wie "eine", "jarvis") time.sleep(1) #verbesserung der spracheingabe: wurde hinzugefügt weil es sonst worte halluziniert (wie "eine", "jarvis")
state = STATE_LISTENING state = STATE_LISTENING
print("WAKE WORD DETECTED") print("WAKE WORD DETECTED")
@@ -374,12 +591,12 @@ def real_wakeword_detector():
while True: while True:
pass pass
"""
# ========================= # =========================
# MAIN LOOP # MAIN LOOP
# ========================= # =========================
""" def main(): def main():
threading.Thread(target=vosk_listener, daemon=True).start() threading.Thread(target=vosk_listener, daemon=True).start()
# threading.Thread(target=fake_wakeword_detector, daemon=True).start() # threading.Thread(target=fake_wakeword_detector, daemon=True).start()
threading.Thread(target=real_wakeword_detector, daemon=True).start() threading.Thread(target=real_wakeword_detector, daemon=True).start()
@@ -389,15 +606,16 @@ def real_wakeword_detector():
text = audio_queue.get(timeout=0.1) text = audio_queue.get(timeout=0.1)
handle_text(text) handle_text(text)
except queue.Empty: except queue.Empty:
pass """ pass
def main(): #main manuelle input
""" def main():
global state global state
state = STATE_LISTENING state = STATE_LISTENING
while True: while True:
text = input("Text input: ") text = input("Text input: ")
handle_text(text) handle_text(text)
"""
if __name__ == "__main__": if __name__ == "__main__":

93
music_control.py Normal file
View File

@@ -0,0 +1,93 @@
import vlc
import os
import threading
class MusicPlayer:
def __init__(self, base_path="/home/tino/Desktop/Abschlussprojekt/test assistant/cloneAssistantAllInOne/RasPi_Voice_Assistant--WIP/musik/"):
self.base_path = base_path
self.instance = vlc.Instance()
self.player = self.instance.media_player_new()
self.lock = threading.Lock()
self.files = []
self.current_index = 0
self.playing = False
self.paused = False
events = self.player.event_manager()
events.event_attach(
vlc.EventType.MediaPlayerEndReached,
self._song_ended
)
def load_genre(self, genre):
folder = os.path.join(self.base_path, genre)
if not os.path.isdir(folder):
return False
if self.playing or self.paused:
self.player.stop()
self.playing = False
self.paused = False
self.files = [
os.path.join(folder, f)
for f in os.listdir(folder)
if f.endswith((".mp3", ".wav"))
]
self.current_index = 0
return bool(self.files)
def _play_current(self):
with self.lock:
media = self.instance.media_new(self.files[self.current_index])
self.player.set_media(media)
self.player.play()
self.playing = True
def _song_ended(self, event):
self.next_song()
def play(self):
if not self.files:
return False
if self.paused:
return self.resume()
if not self.playing:
self._play_current()
return True
def pause(self):
if self.playing:
self.player.pause()
self.playing = False
self.paused = True
def resume(self):
if self.paused:
self.player.play()
self.playing = True
self.paused = False
return True
return False
def stop(self):
self.player.stop()
self.playing = False
self.paused = False
self.current_index = 0
def next_song(self):
with self.lock:
self.current_index = (self.current_index + 1) % len(self.files)
self._play_current()
def previous_song(self):
with self.lock:
self.current_index = (self.current_index - 1) % len(self.files)
self._play_current()

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -101,13 +101,33 @@ def timer_status_info():
def format_duration(seconds): def format_duration(seconds):
if seconds < 60: if seconds < 60:
return f"{seconds} Sekunden" return f"{seconds} {second_text(seconds)}"
elif seconds < 3600:
minutes = seconds // 60 minutes = seconds // 60
return f"{minutes} Minuten" secs = seconds % 60
else:
hours = seconds // 3600 if minutes < 10:
return f"{hours} Stunden" if secs == 0:
return f"{minutes} {minute_text(minutes)}"
return f"{minutes} {minute_text(minutes)} und {secs} {second_text(secs)}"
if minutes < 60:
return f"{minutes} {minute_text(minutes)}"
hours = minutes // 60
mins = minutse % 60
if mins == 0:
return f"{hours} {hour_text(hours)}"
return f"{hours} {hour_text(hours)} und {mins} {minute_text(mins)}"
def minute_text(n):
return "Minute" if n == 1 else "Minuten"
def second_text(n):
return "Sekunde" if n == 1 else "Sekunden"
def hour_text(n):
return "Stunde" if n == 1 else "Stunden"

View File

@@ -3,6 +3,42 @@ import python_weather
import asyncio import asyncio
from datetime import date, datetime, timedelta from datetime import date, datetime, timedelta
def normalize_city(name: str):
DE_TO_EN_CITY = {
# Deutschland (nur wenn DE ≠ EN)
"köln": "Cologne",
"münchen": "Munich",
"nürnberg": "Nuremberg",
"frankfurt am main": "Frankfurt",
"düsseldorf": "Dusseldorf",
"mönchengladbach": "Moenchengladbach",
"saarbrücken": "Saarbruecken",
"würzburg": "Wuerzburg",
"osnabrück": "Osnabrueck",
"göttingen": "Göttingen", # Da im Englischen oft "Goettingen"
# Europa / Ausland (nur wenn DE ≠ EN)
"wien": "Vienna",
"prag": "Prague",
"mailand": "Milan",
"genf": "Geneva",
"brüssel": "Brussels",
"athen": "Athens",
"koppenhagen": "Copenhagen",
"rom": "Rome",
"warszawa": "Warsaw",
"warschau": "Warsaw",
"sankt petersburg": "Saint Petersburg",
"st. petersburg": "Saint Petersburg",
"kiew": "Kyiv",
"florenz": "Florence",
"venedig": "Venice",
"neapel": "Naples",
"andorra la vella": "Andorra la Vella",
}
return DE_TO_EN_CITY.get(name.lower(), name)
async def get_weather_for_location(location: str): async def get_weather_for_location(location: str):
@@ -11,66 +47,78 @@ async def get_weather_for_location(location: str):
now = datetime.now().time() now = datetime.now().time()
today = date.today() today = date.today()
try:
# Declare the client. The measuring unit used defaults to the metric system (celcius, km/h, etc.)
async with python_weather.Client(unit=python_weather.METRIC) as client:
# Fetch a weather forecast from a city.
weather = await client.get(location) ##e. g. Brackenheim
# weather ist dein Forecast-Objekt von python_weather # Declare the client. The measuring unit used defaults to the metric system (celcius, km/h, etc.)
today_forecast = next( async with python_weather.Client(unit=python_weather.METRIC) as client:
(daily for daily in weather if daily.date == today),
None # Fetch a weather forecast from a city.
) location_normalized = normalize_city(location)
weather = await client.get(location_normalized) ##e. g. Brackenheim
def closest_hourly(hourlies, now_time): # weather ist dein Forecast-Objekt von python_weather
closest = None today_forecast = next(
min_diff = None (daily for daily in weather if daily.date == today),
for hourly in hourlies: None
# Differenz zwischen aktueller Uhrzeit und Forecast-Stunde )
diff = abs(
datetime.combine(date.today(), hourly.time) -
datetime.combine(date.today(), now_time)
)
if min_diff is None or diff < min_diff:
min_diff = diff
closest = hourly
return closest
current_hourly = closest_hourly(today_forecast.hourly_forecasts, now) def closest_hourly(hourlies, now_time):
closest = None
min_diff = None
for hourly in hourlies:
# Differenz zwischen aktueller Uhrzeit und Forecast-Stunde
diff = abs(
datetime.combine(date.today(), hourly.time) -
datetime.combine(date.today(), now_time)
)
if min_diff is None or diff < min_diff:
min_diff = diff
closest = hourly
return closest
KIND_TO_DE = {
"SUNNY": "sonnig", if not today_forecast or not today_forecast.hourly_forecasts:
"PARTLY_CLOUDY": "teilweise bewölkt", return None
"CLOUDY": "bewölkt", if len(location_normalized) != len(weather.location):
"VERY_CLOUDY": "stark bewölkt", return None
"FOG": "nebelig",
"LIGHT_SHOWERS": "nach leichtem Regen",
"LIGHT_SLEET_SHOWERS": "nach leichtem Schneeregen",
"LIGHT_SLEET": "nach leichtem Schneeregen",
"THUNDERY_SHOWERS": "nach Gewitterregen",
"LIGHT_SNOW": "nach leichtem Schneefall",
"HEAVY_SNOW": "nach starkem Schneefall",
"LIGHT_RAIN": "nach leichtem Regen",
"HEAVY_SHOWERS": "nach starken Schauer",
"HEAVY_RAIN": "nach starkem Regen",
"LIGHT_SNOW_SHOWERS": "nach leichten Schneeschauern",
"HEAVY_SNOW_SHOWERS": "nach starken Schneeschauern",
"THUNDERY_HEAVY_RAIN": "nach starken Gewitterregen",
"THUNDERY_SNOW_SHOWERS": "nach starken Gewitterschnee",
}
wetterlage = KIND_TO_DE.get(current_hourly.kind.name, "unbegständig") current_hourly = closest_hourly(today_forecast.hourly_forecasts, now)
temperatur = current_hourly.temperature
return {
"location": weather.location,
"temperatur": temperatur,
"wetterlage": wetterlage KIND_TO_DE = {
} "SUNNY": "nach freiem Himmel",
"PARTLY_CLOUDY": "teilweise bewölkt",
"CLOUDY": "bewölkt",
"VERY_CLOUDY": "stark bewölkt",
"FOG": "nebelig",
"LIGHT_SHOWERS": "nach leichtem Regen",
"LIGHT_SLEET_SHOWERS": "nach leichtem Schneeregen",
"LIGHT_SLEET": "nach leichtem Schneeregen",
"THUNDERY_SHOWERS": "nach Gewitterregen",
"LIGHT_SNOW": "nach leichtem Schneefall",
"HEAVY_SNOW": "nach starkem Schneefall",
"LIGHT_RAIN": "nach leichtem Regen",
"HEAVY_SHOWERS": "nach starken Schauer",
"HEAVY_RAIN": "nach starkem Regen",
"LIGHT_SNOW_SHOWERS": "nach leichten Schneeschauern",
"HEAVY_SNOW_SHOWERS": "nach starken Schneeschauern",
"THUNDERY_HEAVY_RAIN": "nach starken Gewitterregen",
"THUNDERY_SNOW_SHOWERS": "nach starken Gewitterschnee",
}
wetterlage = KIND_TO_DE.get(current_hourly.kind.name, "unbegständig")
temperatur = current_hourly.temperature
return {
"location": weather.location,
"temperatur": temperatur,
"wetterlage": wetterlage
}
except python_weather.errors.RequestError:
return None
#print(f"Aktuell sind es in {ort} {temperatur} Grad und die Wetterlage sieht {wetterlage} aus.") #print(f"Aktuell sind es in {ort} {temperatur} Grad und die Wetterlage sieht {wetterlage} aus.")

147
wecker_control.py Normal file
View File

@@ -0,0 +1,147 @@
import time
import threading
from text2numde import text2num, is_number, sentence2num
from playsound3 import playsound
from datetime import datetime, timedelta
import re
import math
wecker_thread = None
wecker_stop = threading.Event()
wecker_status = "idle"
wecker_target_time: datetime | None = None
def parse_time_wecker(text):
text = text.lower().strip()
match = re.search(r"\b([\w]+)\s*uhr(?:\s*([\w]+))?\b", text)
if not match:
return None
hour_text = match.group(1)
minute_text= match.group(2)
try:
hour = text2num(hour_text)
minute = text2num(minute_text) if minute_text else 0
except ValueError:
print("Konnte die Zahl nicht erkennen.")
return None
if hour < 0 or hour > 23:
return None
if minute < 0 or minute > 59:
return None
print(f"{hour:02d}:{minute:02d}")
return hour, minute
def calculate_target_datetime(hour: int, minute: int) -> datetime:
now = datetime.now()
target = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if target <= now:
target += timedelta(days=1)
return target
def minutes_until(target: datetime) -> int:
#return int(((target - now)).total_seconds() // 60) hatte das davor aber das hat abgerundet
seconds = (target - datetime.now()).total_seconds()
return math.ceil(seconds / 60)
def start_wecker(target_time: datetime):
global wecker_thread, wecker_status, wecker_target_time
if wecker_status == "running":
return False # wecker läuft bereits
wecker_stop.clear()
wecker_target_time = target_time
wecker_status = "running"
def run():
global wecker_status
seconds = (wecker_target_time - datetime.now()).total_seconds()
if seconds < 0:
seconds = 0
if not wecker_stop.wait(seconds):
wecker_status = "finished"
print("wecker abgelaufen") # """ TTS """
wecker_thread = threading.Thread(target=run, daemon=True)
wecker_thread.start()
return True
"""
print(f"wecker startet für {seconds} Sekunden...")
time.sleep(seconds)
print("wecker abgelaufen!")
playsound("/home/tino/Desktop/Abschlussprojekt/test assistant/cloneAssistantAllInOne/RasPi_Voice_Assistant--WIP/clock-alarm-8761.mp3")
sound.stop()
"""
""" # Beispielnutzung
spoken_input = "eine sekunde" # This could come from a voice assistant
seconds = parse_time(spoken_input)
if seconds:
start_wecker(seconds) """
def stop_wecker():
global wecker_status
if wecker_status != "running":
print("Kein wecker gestellt")
return False
wecker_stop.set()
wecker_status = "stopped"
return True
def wecker_status_info():
if wecker_status != "running":
return {"status": wecker_status, "target_time": 0}
return {
"status": "running",
"target_time": wecker_target_time
}
def format_duration(seconds):
if seconds < 60:
return f"{seconds} {second_text(seconds)}"
minutes = seconds // 60
secs = seconds % 60
if minutes < 10:
if secs == 0:
return f"{minutes} {minute_text(minutes)}"
return f"{minutes} {minute_text(minutes)} und {secs} {second_text(secs)}"
if minutes < 60:
return f"{minutes} {minute_text(minutes)}"
hours = minutes // 60
mins = minutes % 60
if mins == 0:
return f"{hours} {hour_text(hours)}"
return f"{hours} {hour_text(hours)} und {mins} {minute_text(mins)}"
def minute_text(n):
return "Minute" if n == 1 else "Minuten"
def second_text(n):
return "Sekunde" if n == 1 else "Sekunden"
def hour_text(n):
return "Stunde" if n == 1 else "Stunden"