Fireworks /
Streaming Speech Transcription
accounts/fireworks/models/streaming-speech
ServerlessAudio
ServerlessAudio
Fireworks Streaming Speech Transcription allows doing real-time transcription over WebSockets
Streaming Speech Transcription is available via Fireworks' Streaming Speech-to-Text APIs, where you are billed based on the duration of the transcribed audio
Generate a model response using the streaming-transcription endpoint of streaming-speech. API reference
import io import os from typing import Iterator, Tuple import torch import torchaudio SAMPLE_RATE = 16_000 # For demonstration, we use the file path from your config (with a fallback). FILE_PATH = "/home/3.5m.flac" def _audio_tensor_to_bytes(value: torch.Tensor) -> bytes: """ Convert a waveform Tensor to PCM bytes for streaming. """ return (value * 32768.0).to(torch.int16).numpy().tobytes() def _audio_path_to_tensor(path: str) -> torch.Tensor: """ Load and optionally resample an audio file into a Torch tensor. """ with open(path, "rb") as file: target_sr = SAMPLE_RATE waveform, original_sr = torchaudio.load(file) if original_sr != target_sr: resampler = torchaudio.transforms.Resample( orig_freq=original_sr, new_freq=target_sr ) waveform = resampler(waveform) # Convert to mono if multiple channels if waveform.shape[0] > 1: waveform = torch.mean(waveform, dim=0, keepdim=True) return waveform # Example chunk size in seconds: chunk_seconds = 0.2 audio_tensor = _audio_path_to_tensor(FILE_PATH).squeeze() chunk_size = int(chunk_seconds * SAMPLE_RATE) audio_chunks = [] for i in range(0, len(audio_tensor), chunk_size): chunk = audio_tensor[i : i + chunk_size].unsqueeze(0) audio_chunk = _audio_tensor_to_bytes(chunk) audio_chunks.append((audio_chunk, chunk_seconds)) print(f"Loaded {len(audio_chunks)} chunks") # WebSocket client for streaming audio transcription import json import threading import time import websocket import urllib.parse # Build the streaming endpoint (model path + any query parameters). # We'll pass at least the language. You might add model, etc., as query params if needed. ENDPOINT_URL_BASE = "wss://audio-streaming.us-virginia-1.direct.fireworks.ai" ENDPOINT_PATH = "/v1/audio/transcriptions/streaming" url_params = urllib.parse.urlencode({"language": "en"}) ENDPOINT_URL = f"{ENDPOINT_URL_BASE}{ENDPOINT_PATH}?{url_params}" print(f"Connecting to: {ENDPOINT_URL}") def run_websocket_client(audio_stream: Iterator[Tuple[bytes, float]]): """ Send audio chunks over WebSocket for streaming transcription. """ lock = threading.Lock() segments = {} def on_open(ws): def stream_audio(ws): # Stream each chunk, then sleep for chunk duration for audio_chunk, duration in audio_stream: ws.send(audio_chunk, opcode=websocket.ABNF.OPCODE_BINARY) time.sleep(duration) # Give the server some time to finalize any last transcription segments time.sleep(10) ws.close() threading.Thread(target=stream_audio, args=(ws,)).start() def on_error(ws, error): print(f"Error: {error}") def on_message(ws, message): response = json.loads(message) if "error" in response: print(response["error"]) else: with lock: for segment in response.get("segments", []): segments[segment["id"]] = segment["text"] print("\\n".join(f" - {k}: {v}" for k, v in segments.items())) ws = websocket.WebSocketApp( ENDPOINT_URL, on_open=on_open, on_message=on_message, on_error=on_error, ) ws.run_forever() # Start streaming audio chunks for transcription run_websocket_client(audio_chunks)