diff --git a/.env.example b/.env.example index 917e75d..6ff48b0 100644 --- a/.env.example +++ b/.env.example @@ -3,3 +3,15 @@ GROQ_API_KEY=your_groq_api_key_here DEEPGRAM_API_KEY=your_deepgram_api_key_here + +# Optional: 60db cloud (TTS + STT). Defaults keep Deepgram. +# Flip switches independently: +# STT_PROVIDER=sixtydb -> 60db /ws/stt +# TTS_PROVIDER=sixtydb -> 60db /tts-stream -> ffplay +STT_PROVIDER= +TTS_PROVIDER= +SIXTYDB_API_KEY= +# SIXTYDB_API_BASE=https://api.60db.ai +# SIXTYDB_TTS_VOICE_ID=fbb75ed2-975a-40c7-9e06-38e30524a9a1 +# SIXTYDB_TTS_TRANSPORT=stream # stream (default) | sync | ws +# SIXTYDB_STT_LANGUAGE=en diff --git a/README.md b/README.md index 7dc2a45..0d98f98 100644 --- a/README.md +++ b/README.md @@ -198,6 +198,31 @@ Voice-Chat-Bot/ 4. Add tests if applicable 5. Submit a pull request +## 60db Provider (alongside Deepgram) + +This fork adds [60db](https://docs.60db.ai) as a peer of Deepgram for both TTS and STT. Each service has an independent env switch and defaults preserve the Deepgram + Groq path. + +| Concern | File | Switch | Endpoint | +|---|---|---|---| +| TTS | `sixtydb_tts.py` (`SixtyDbSpeechSynthesizer`) | `TTS_PROVIDER=sixtydb` + `SIXTYDB_TTS_TRANSPORT=stream\|sync\|ws` | `POST /tts-stream` (default, NDJSON mp3 chunks), `POST /tts-synthesize` (one-shot mp3), or `wss://api.60db.ai/ws/tts` (LINEAR16 PCM 24k, ffplay invoked with `-f s16le -ar 24000 -ac 1`) | +| STT | `sixtydb_stt.py` (`SixtyDbLiveTranscriber`) | `STT_PROVIDER=sixtydb` | `wss://api.60db.ai/ws/stt` browser mode (linear PCM 16k) | +| LLM | _no new file_ | manual edit | replace `ChatGroq(...)` in `Voice_Bot.py:113` with `ChatOpenAI(model="60db-tiny", base_url="https://api.60db.ai/v1", api_key=os.getenv("SIXTYDB_API_KEY"))` | + +Both 60db classes match their Deepgram counterparts' interfaces exactly (`speak(text)` and `async listen() -> str`), so `VoiceAssistant.__init__` only branches on env — no body changes elsewhere. + +```env +SIXTYDB_API_KEY=sk_live_... +TTS_PROVIDER=sixtydb +STT_PROVIDER=sixtydb +SIXTYDB_API_BASE=https://api.60db.ai +SIXTYDB_TTS_VOICE_ID=fbb75ed2-975a-40c7-9e06-38e30524a9a1 +SIXTYDB_STT_LANGUAGE=en +``` + +Extra Python deps for the 60db path: `websockets`, `pyaudio` (already required by Deepgram's `Microphone`). + +Reference: [docs.60db.ai](https://docs.60db.ai). + In case of any queries, please leave a message or contact me via the email provided in my profile.

diff --git a/Voice_Bot.py b/Voice_Bot.py index d874677..5882a78 100644 --- a/Voice_Bot.py +++ b/Voice_Bot.py @@ -184,9 +184,28 @@ class VoiceAssistant: TERMINATION_PHRASE = "goodbye" def __init__(self, config: Config): - self.transcriber = LiveTranscriber(config) + # Provider switches — each defaults to the original Deepgram/Groq + # path so existing setups keep working with no .env edits. + # STT_PROVIDER=sixtydb → 60db /ws/stt + # TTS_PROVIDER=sixtydb → 60db /tts-stream → ffplay + # The LLM stays on Groq here; the LLM can also be routed through + # 60db by editing LLMProcessor's ChatGroq to ChatOpenAI(base_url=...). + stt_provider = os.getenv("STT_PROVIDER", "deepgram").strip().lower() + tts_provider = os.getenv("TTS_PROVIDER", "deepgram").strip().lower() + + if stt_provider in ("sixtydb", "60db"): + from sixtydb_stt import SixtyDbLiveTranscriber + self.transcriber = SixtyDbLiveTranscriber(config) + else: + self.transcriber = LiveTranscriber(config) + self.llm_processor = LLMProcessor(config) - self.synthesizer = SpeechSynthesizer(config) + + if tts_provider in ("sixtydb", "60db"): + from sixtydb_tts import SixtyDbSpeechSynthesizer + self.synthesizer = SixtyDbSpeechSynthesizer(config) + else: + self.synthesizer = SpeechSynthesizer(config) async def run(self): """The main loop for the voice assistant.""" diff --git a/sixtydb_stt.py b/sixtydb_stt.py new file mode 100644 index 0000000..bb1edd1 --- /dev/null +++ b/sixtydb_stt.py @@ -0,0 +1,148 @@ +"""60db STT — peer of Voice_Bot.LiveTranscriber. + +Exposes `SixtyDbLiveTranscriber.listen()` async coroutine returning a +final transcript string, matching the LiveTranscriber interface in +Voice_Bot.py. Uses 60db /ws/stt browser mode (linear PCM 16k JSON +envelopes) and PyAudio for mic capture. + +The session closes after the first canonical final (is_final + +speech_final), mirroring how LiveTranscriber resolves its Future per +turn so the main loop advances identically. + +Reference: https://docs.60db.ai/websocket-api/stt +""" + +from __future__ import annotations + +import asyncio +import base64 +import json +import os + +from dotenv import load_dotenv + +load_dotenv() + +DEFAULT_API_BASE = "https://api.60db.ai" +SAMPLE_RATE = 16000 +CHUNK_BYTES = int(SAMPLE_RATE * 2 * 0.06) # 60 ms of 16-bit mono + + +class SixtyDbLiveTranscriber: + """Drop-in peer of Voice_Bot.LiveTranscriber.""" + + def __init__(self, _config=None): + self.api_key = os.getenv("SIXTYDB_API_KEY") + if not self.api_key: + raise ValueError("SIXTYDB_API_KEY is not set in the environment.") + self.api_base = (os.getenv("SIXTYDB_API_BASE") or DEFAULT_API_BASE).rstrip("/") + self.language = os.getenv("SIXTYDB_STT_LANGUAGE", "en") + + async def listen(self) -> str: + try: + import websockets + import pyaudio + except ImportError as e: + raise RuntimeError( + "60db STT requires 'websockets' and 'pyaudio': " + str(e) + ) + + ws_base = self.api_base.replace("https://", "wss://").replace("http://", "ws://") + url = f"{ws_base}/ws/stt?apiKey={self.api_key}" + final_text = "" + transcription_complete = asyncio.Event() + pa = pyaudio.PyAudio() + stream = pa.open( + format=pyaudio.paInt16, + channels=1, + rate=SAMPLE_RATE, + input=True, + frames_per_buffer=CHUNK_BYTES // 2, + ) + + try: + async with websockets.connect(url, max_size=None) as ws: + # Skip past `connecting`, await `connection_established`. + while True: + probe = json.loads(await ws.recv()) + if "connection_established" in probe: + break + + await ws.send(json.dumps({ + "type": "start", + "languages": [self.language], + "config": { + "encoding": "linear", + "sample_rate": SAMPLE_RATE, + "utterance_end_ms": 500, + "continuous_mode": False, + "interim_results_frequency": 300, + "audio_enhancement": "adaptive", + }, + })) + + async def _send_audio(): + loop = asyncio.get_running_loop() + while not transcription_complete.is_set(): + try: + chunk = await loop.run_in_executor( + None, stream.read, CHUNK_BYTES // 2, False + ) + except OSError: + break + if not chunk: + continue + try: + await ws.send(json.dumps({ + "type": "audio", + "audio": base64.b64encode(chunk).decode(), + "encoding": "linear", + "sample_rate": SAMPLE_RATE, + })) + except Exception: + break + + sender_task: asyncio.Task | None = None + async for raw in ws: + msg = json.loads(raw) + mtype = msg.get("type") + if mtype == "connected" and sender_task is None: + sender_task = asyncio.create_task(_send_audio()) + continue + if mtype in ("speech_started", "session_stopped"): + if mtype == "session_stopped": + break + continue + if mtype != "transcription": + continue + text = (msg.get("text") or "").strip() + if not text: + continue + if msg.get("is_final") and msg.get("speech_final"): + final_text = text + transcription_complete.set() + try: + await ws.send(json.dumps({"type": "stop"})) + except Exception: + pass + try: + await asyncio.wait_for(ws.recv(), timeout=2.0) + except (asyncio.TimeoutError, Exception): + pass + break + + if sender_task: + sender_task.cancel() + try: + await sender_task + except (asyncio.CancelledError, Exception): + pass + finally: + try: + stream.stop_stream() + stream.close() + except Exception: + pass + pa.terminate() + + return final_text diff --git a/sixtydb_tts.py b/sixtydb_tts.py new file mode 100644 index 0000000..1e2465b --- /dev/null +++ b/sixtydb_tts.py @@ -0,0 +1,229 @@ +"""60db TTS — peer of Voice_Bot.SpeechSynthesizer. + +Exposes `SixtyDbSpeechSynthesizer.speak(text)` with the same streaming +semantics as the Deepgram path: POST → iter chunks → write to ffplay +stdin → audio plays as it arrives. + +Three transport surfaces, picked via SIXTYDB_TTS_TRANSPORT env: + stream (default) — POST /tts-stream (NDJSON of base64 mp3 chunks) + sync — POST /tts-synthesize (one-shot mp3) + ws — wss://api.60db.ai/ws/tts (LINEAR16 24k PCM, + ffplay invoked with -f s16le for raw PCM) + +References: + https://docs.60db.ai/api-reference/tts/text-to-speech + https://docs.60db.ai/api-reference/tts/text-to-speech-stream + https://docs.60db.ai/websocket-api/tts +""" + +from __future__ import annotations + +import asyncio +import base64 +import json +import os +import subprocess +import time + +import requests +from dotenv import load_dotenv + +load_dotenv() + +DEFAULT_API_BASE = "https://api.60db.ai" +DEFAULT_VOICE_ID = "fbb75ed2-975a-40c7-9e06-38e30524a9a1" +WS_SAMPLE_RATE = 24000 + + +class SixtyDbSpeechSynthesizer: + """Drop-in peer of Voice_Bot.SpeechSynthesizer.""" + + def __init__(self, _config=None): + self.api_key = os.getenv("SIXTYDB_API_KEY") + if not self.api_key: + raise ValueError("SIXTYDB_API_KEY is not set in the environment.") + self.api_base = (os.getenv("SIXTYDB_API_BASE") or DEFAULT_API_BASE).rstrip("/") + self.voice_id = os.getenv("SIXTYDB_TTS_VOICE_ID", DEFAULT_VOICE_ID) + self.transport = os.getenv("SIXTYDB_TTS_TRANSPORT", "stream").strip().lower() + + def speak(self, text: str) -> None: + if self.transport == "sync": + self._speak_sync(text) + elif self.transport == "ws": + self._speak_ws(text) + else: + self._speak_ndjson(text) + + # ---- helpers ----------------------------------------------------------- + + @staticmethod + def _spawn_ffplay(extra_args: list[str] | None = None) -> subprocess.Popen: + cmd = ["ffplay", "-autoexit", "-nodisp", *(extra_args or []), "-"] + return subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + def _auth_json_headers(self) -> dict: + return { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + } + + # ---- sync REST (POST /tts-synthesize → mp3) ---------------------------- + + def _speak_sync(self, text: str) -> None: + payload = { + "text": text, + "voice_id": self.voice_id, + "enhance": True, + "speed": 1, + "stability": 50, + "similarity": 75, + "output_format": "mp3", + } + start = time.time() + try: + r = requests.post( + f"{self.api_base}/tts-synthesize", + json=payload, + headers=self._auth_json_headers(), + timeout=60, + ) + r.raise_for_status() + data = r.json() + if not data.get("success") or not data.get("audio_base64"): + print(f"60db /tts-synthesize empty: {data.get('message')}") + return + mp3 = base64.b64decode(data["audio_base64"]) + ttfb = int((time.time() - start) * 1000) + print(f"TTS TTFB: {ttfb}ms (sync REST)\n") + player = self._spawn_ffplay() + try: + player.stdin.write(mp3) + finally: + if player.stdin: + player.stdin.close() + player.wait() + except requests.exceptions.RequestException as e: + print(f"TTS Request Error: {e}") + + # ---- NDJSON stream (POST /tts-stream → mp3 chunks) --------------------- + + def _speak_ndjson(self, text: str) -> None: + payload = {"text": text, "voice_id": self.voice_id, "output_format": "mp3"} + player = self._spawn_ffplay() + start = time.time() + first = False + try: + with requests.post( + f"{self.api_base}/tts-stream", + json=payload, + headers=self._auth_json_headers(), + stream=True, + timeout=120, + ) as r: + r.raise_for_status() + for raw in r.iter_lines(decode_unicode=True): + line = (raw or "").strip() + if not line: + continue + try: + msg = json.loads(line) + except json.JSONDecodeError: + continue + if msg.get("type") == "error": + print(f"60db TTS stream error: {line[:200]}") + break + if msg.get("type") == "complete": + break + content = msg.get("audioContent") + if not content: + continue + chunk = base64.b64decode(content) + if not first: + ttfb = int((time.time() - start) * 1000) + print(f"TTS TTFB: {ttfb}ms (NDJSON stream)\n") + first = True + player.stdin.write(chunk) + player.stdin.flush() + except requests.exceptions.RequestException as e: + print(f"TTS Request Error: {e}") + finally: + if player.stdin: + player.stdin.close() + player.wait() + + # ---- WebSocket (wss://.../ws/tts → LINEAR16 PCM) ---------------------- + + def _speak_ws(self, text: str) -> None: + try: + import websockets # noqa: F401 + except ImportError as e: + print(f"SIXTYDB_TTS_TRANSPORT=ws needs 'websockets': {e}") + return + # Raw 16-bit PCM needs explicit format flags so ffplay doesn't + # try to auto-detect a container. + player = self._spawn_ffplay( + ["-f", "s16le", "-ar", str(WS_SAMPLE_RATE), "-ac", "1"] + ) + start = time.time() + first_holder = [False] + + async def _run(): + import websockets + ws_base = self.api_base.replace("https://", "wss://").replace("http://", "ws://") + url = f"{ws_base}/ws/tts?apiKey={self.api_key}" + context_id = f"ctx-{os.getpid()}" + async with websockets.connect(url, max_size=None) as ws: + async for raw in ws: + msg = json.loads(raw) + if "connection_established" in msg: + await ws.send(json.dumps({ + "create_context": { + "context_id": context_id, + "voice_id": self.voice_id, + "audio_config": { + "audio_encoding": "LINEAR16", + "sample_rate_hertz": WS_SAMPLE_RATE, + }, + } + })) + continue + if "context_created" in msg: + await ws.send(json.dumps({ + "send_text": {"context_id": context_id, "text": text} + })) + await ws.send(json.dumps({ + "flush_context": {"context_id": context_id} + })) + continue + chunk_b64 = msg.get("audio_chunk", {}).get("audioContent") + if chunk_b64: + pcm = base64.b64decode(chunk_b64) + if not first_holder[0]: + ttfb = int((time.time() - start) * 1000) + print(f"TTS TTFB: {ttfb}ms (WebSocket)\n") + first_holder[0] = True + player.stdin.write(pcm) + player.stdin.flush() + continue + if "flush_completed" in msg: + try: + await ws.send(json.dumps({ + "close_context": {"context_id": context_id} + })) + except Exception: + pass + break + + try: + asyncio.new_event_loop().run_until_complete(_run()) + except Exception as e: + print(f"60db WS TTS error: {e}") + finally: + if player.stdin: + player.stdin.close() + player.wait()