Streaming Speech Transcription
accounts/fireworks/models/streaming-speech
Fireworks Streaming Speech Transcription allows doing real-time transcription over WebSockets
Streaming Speech Transcription is available via Fireworks' Streaming Speech-to-Text APIs, where you are billed based on the duration of the transcribed audio
Run queries immediately, pay only for usage
import io import os from typing import Iterator, Tuple import torch import torchaudio SAMPLE_RATE = 16_000 # For demonstration, we use the file path from your config (with a fallback). FILE_PATH = "/home/3.5m.flac" def _audio_tensor_to_bytes(value: torch.Tensor) -> bytes: """ Convert a waveform Tensor to PCM bytes for streaming. """ return (value * 32768.0).to(torch.int16).numpy().tobytes() def _audio_path_to_tensor(path: str) -> torch.Tensor: """ Load and optionally resample an audio file into a Torch tensor. """ with open(path, "rb") as file: target_sr = SAMPLE_RATE waveform, original_sr = torchaudio.load(file) if original_sr != target_sr: resampler = torchaudio.transforms.Resample( orig_freq=original_sr, new_freq=target_sr ) waveform = resampler(waveform) # Convert to mono if multiple channels if waveform.shape[0] > 1: waveform = torch.mean(waveform, dim=0, keepdim=True) return waveform # Example chunk size in seconds: chunk_seconds = 0.2 audio_tensor = _audio_path_to_tensor(FILE_PATH).squeeze() chunk_size = int(chunk_seconds * SAMPLE_RATE) audio_chunks = [] for i in range(0, len(audio_tensor), chunk_size): chunk = audio_tensor[i : i + chunk_size].unsqueeze(0) audio_chunk = _audio_tensor_to_bytes(chunk) audio_chunks.append((audio_chunk, chunk_seconds)) print(f"Loaded {len(audio_chunks)} chunks") # WebSocket client for streaming audio transcription import json import threading import time import websocket import urllib.parse # Build the streaming endpoint (model path + any query parameters). # We'll pass at least the language. You might add model, etc., as query params if needed. ENDPOINT_URL_BASE = "wss://audio-streaming.us-virginia-1.direct.fireworks.ai" ENDPOINT_PATH = "/v1/audio/transcriptions/streaming" url_params = urllib.parse.urlencode({"language": "en"}) ENDPOINT_URL = f"{ENDPOINT_URL_BASE}{ENDPOINT_PATH}?{url_params}" print(f"Connecting to: {ENDPOINT_URL}") def run_websocket_client(audio_stream: Iterator[Tuple[bytes, float]]): """ Send audio chunks over WebSocket for streaming transcription. """ lock = threading.Lock() segments = {} def on_open(ws): def stream_audio(ws): # Stream each chunk, then sleep for chunk duration for audio_chunk, duration in audio_stream: ws.send(audio_chunk, opcode=websocket.ABNF.OPCODE_BINARY) time.sleep(duration) # Give the server some time to finalize any last transcription segments time.sleep(10) ws.close() threading.Thread(target=stream_audio, args=(ws,)).start() def on_error(ws, error): print(f"Error: {error}") def on_message(ws, message): response = json.loads(message) if "error" in response: print(response["error"]) else: with lock: for segment in response.get("segments", []): segments[segment["id"]] = segment["text"] print("\\n".join(f" - {k}: {v}" for k, v in segments.items())) ws = websocket.WebSocketApp( ENDPOINT_URL, on_open=on_open, on_message=on_message, on_error=on_error, ) ws.run_forever() # Start streaming audio chunks for transcription run_websocket_client(audio_chunks)