Introduction

OpenAI Whisper is a state-of-the-art open-source speech recognition model that supports 99 languages with remarkable accuracy. In this tutorial, we'll build a complete speech-to-text API service that you can self-host — no cloud API costs required.

What You'll Build

  • A FastAPI REST service for audio transcription
  • Multi-language speech recognition (auto-detect or specify)
  • Speaker diarization (who said what)
  • Real-time streaming transcription via WebSocket
  • Docker deployment for production
  • Prerequisites

  • Python 3.10+
  • FFmpeg installed
  • GPU recommended (CUDA) but CPU works too
  • Basic knowledge of Python and REST APIs
  • Step 1: Project Setup

    Create the project structure:

    mkdir whisper-api && cd whisper-api
    

    python -m venv venv

    source venv/bin/activate

    pip install openai-whisper fastapi uvicorn python-multipart \

    pydub torch torchaudio websockets pyannote.audio

    whisper-api/
    

    ├── app/

    │ ├── __init__.py

    │ ├── main.py

    │ ├── models.py

    │ ├── transcriber.py

    │ ├── diarizer.py

    │ └── ws_stream.py

    ├── Dockerfile

    ├── docker-compose.yml

    ├── requirements.txt

    └── tests/

    └── test_api.py

    Step 2: Core Transcription Engine

    Create app/transcriber.py:

    import whisper
    

    import torch

    from functools import lru_cache

    from pathlib import Path

    import tempfile

    import logging

    logger = logging.getLogger(__name__)

    class WhisperTranscriber:

    """Wrapper around OpenAI Whisper for audio transcription."""

    def __init__(self, model_size: str = "base"):

    self.device = "cuda" if torch.cuda.is_available() else "cpu"

    logger.info(f"Loading Whisper model '{model_size}' on {self.device}")

    self.model = whisper.load_model(model_size, device=self.device)

    logger.info("Model loaded successfully")

    def transcribe(

    self,

    audio_path: str,

    language: str | None = None,

    task: str = "transcribe", # or "translate" (to English)

    word_timestamps: bool = False,

    ) -> dict:

    """Transcribe an audio file.

    Args:

    audio_path: Path to audio file (any format FFmpeg supports)

    language: ISO language code (None for auto-detect)

    task: 'transcribe' or 'translate'

    word_timestamps: Include word-level timestamps

    Returns:

    dict with text, segments, language, and duration

    """

    options = {

    "task": task,

    "word_timestamps": word_timestamps,

    "verbose": False,

    }

    if language:

    options["language"] = language

    result = self.model.transcribe(audio_path, **options)

    return {

    "text": result["text"].strip(),

    "language": result["language"],

    "segments": [

    {

    "id": seg["id"],

    "start": round(seg["start"], 2),

    "end": round(seg["end"], 2),

    "text": seg["text"].strip(),

    **({

    "words": [

    {

    "word": w["word"],

    "start": round(w["start"], 2),

    "end": round(w["end"], 2),

    "probability": round(w["probability"], 3),

    }

    for w in seg.get("words", [])

    ]

    } if word_timestamps else {}),

    }

    for seg in result["segments"]

    ],

    }

    def detect_language(self, audio_path: str) -> dict:

    """Detect the spoken language in an audio file."""

    audio = whisper.load_audio(audio_path)

    audio = whisper.pad_or_trim(audio)

    mel = whisper.log_mel_spectrogram(audio).to(self.device)

    _, probs = self.model.detect_language(mel)

    top_5 = sorted(probs.items(), key=lambda x: x[1], reverse=True)[:5]

    return {

    "detected_language": top_5[0][0],

    "confidence": round(top_5[0][1], 4),

    "top_languages": [

    {"language": lang, "probability": round(prob, 4)}

    for lang, prob in top_5

    ],

    }

    @lru_cache(maxsize=1)

    def get_transcriber(model_size: str = "base") -> WhisperTranscriber:

    """Singleton factory for the transcriber."""

    return WhisperTranscriber(model_size)

    Step 3: Pydantic Models

    Create app/models.py:

    from pydantic import BaseModel, Field
    

    from enum import Enum

    class ModelSize(str, Enum):

    tiny = "tiny"

    base = "base"

    small = "small"

    medium = "medium"

    large = "large-v3"

    class TaskType(str, Enum):

    transcribe = "transcribe"

    translate = "translate"

    class TranscriptionRequest(BaseModel):

    language: str | None = Field(None, description="ISO language code (auto-detect if omitted)")

    task: TaskType = Field(TaskType.transcribe, description="transcribe or translate to English")

    word_timestamps: bool = Field(False, description="Include word-level timestamps")

    diarize: bool = Field(False, description="Enable speaker diarization")

    class WordInfo(BaseModel):

    word: str

    start: float

    end: float

    probability: float

    class Segment(BaseModel):

    id: int

    start: float

    end: float

    text: str

    speaker: str | None = None

    words: list[WordInfo] = []

    class TranscriptionResponse(BaseModel):

    text: str

    language: str

    duration: float | None = None

    segments: list[Segment]

    speakers: list[str] | None = None

    class LanguageDetectionResponse(BaseModel):

    detected_language: str

    confidence: float

    top_languages: list[dict]

    Step 4: Speaker Diarization

    Create app/diarizer.py:

    from pyannote.audio import Pipeline
    

    import torch

    import logging

    logger = logging.getLogger(__name__)

    class SpeakerDiarizer:

    """Speaker diarization using pyannote.audio."""

    def __init__(self, hf_token: str):

    self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    logger.info("Loading diarization pipeline...")

    self.pipeline = Pipeline.from_pretrained(

    "pyannote/speaker-diarization-3.1",

    use_auth_token=hf_token,

    ).to(self.device)

    logger.info("Diarization pipeline loaded")

    def diarize(self, audio_path: str) -> list[dict]:

    """Run speaker diarization on audio file.

    Returns list of {start, end, speaker} segments.

    """

    diarization = self.pipeline(audio_path)

    segments = []

    for turn, _, speaker in diarization.itertracks(yield_label=True):

    segments.append({

    "start": round(turn.start, 2),

    "end": round(turn.end, 2),

    "speaker": speaker,

    })

    return segments

    def assign_speakers(

    self, transcription_segments: list[dict], diarization_segments: list[dict]

    ) -> list[dict]:

    """Assign speaker labels to transcription segments."""

    for t_seg in transcription_segments:

    mid = (t_seg["start"] + t_seg["end"]) / 2

    best_speaker = "UNKNOWN"

    min_dist = float("inf")

    for d_seg in diarization_segments:

    if d_seg["start"] <= mid <= d_seg["end"]:

    best_speaker = d_seg["speaker"]

    break

    dist = min(abs(mid - d_seg["start"]), abs(mid - d_seg["end"]))

    if dist < min_dist:

    min_dist = dist

    best_speaker = d_seg["speaker"]

    t_seg["speaker"] = best_speaker

    return transcription_segments

    Step 5: FastAPI Application

    Create app/main.py:

    import os
    

    import tempfile

    import time

    import logging

    from contextlib import asynccontextmanager

    from pathlib import Path

    from fastapi import FastAPI, UploadFile, File, Form, HTTPException

    from fastapi.middleware.cors import CORSMiddleware

    from .transcriber import get_transcriber

    from .models import (

    TranscriptionResponse,

    LanguageDetectionResponse,

    TaskType,

    )

    logging.basicConfig(level=logging.INFO)

    logger = logging.getLogger(__name__)

    MODEL_SIZE = os.getenv("WHISPER_MODEL", "base")

    HF_TOKEN = os.getenv("HF_TOKEN", "")

    diarizer = None

    @asynccontextmanager

    async def lifespan(app: FastAPI):

    """Pre-load model on startup."""

    get_transcriber(MODEL_SIZE)

    if HF_TOKEN:

    global diarizer

    from .diarizer import SpeakerDiarizer

    diarizer = SpeakerDiarizer(HF_TOKEN)

    yield

    app = FastAPI(

    title="Whisper Speech-to-Text API",

    version="1.0.0",

    lifespan=lifespan,

    )

    app.add_middleware(

    CORSMiddleware,

    allow_origins=["*"],

    allow_methods=["*"],

    allow_headers=["*"],

    )

    @app.post("/transcribe", response_model=TranscriptionResponse)

    async def transcribe_audio(

    file: UploadFile = File(..., description="Audio file (mp3, wav, m4a, etc.)"),

    language: str | None = Form(None),

    task: TaskType = Form(TaskType.transcribe),

    word_timestamps: bool = Form(False),

    diarize: bool = Form(False),

    ):

    """Transcribe an uploaded audio file."""

    if not file.content_type or not (

    file.content_type.startswith("audio/") or file.content_type == "video/mp4"

    ):

    raise HTTPException(400, "File must be an audio file")

    start_time = time.time()

    suffix = Path(file.filename or "audio.wav").suffix or ".wav"

    with tempfile.NamedTemporaryFile(suffix=suffix, delete=True) as tmp:

    content = await file.read()

    tmp.write(content)

    tmp.flush()

    transcriber = get_transcriber(MODEL_SIZE)

    result = transcriber.transcribe(

    tmp.name,

    language=language,

    task=task.value,

    word_timestamps=word_timestamps,

    )

    # Speaker diarization

    speakers = None

    if diarize and diarizer:

    d_segments = diarizer.diarize(tmp.name)

    result["segments"] = diarizer.assign_speakers(

    result["segments"], d_segments

    )

    speakers = sorted(set(s["speaker"] for s in d_segments))

    elif diarize and not diarizer:

    raise HTTPException(400, "Diarization not available (set HF_TOKEN)")

    duration = round(time.time() - start_time, 2)

    return TranscriptionResponse(

    text=result["text"],

    language=result["language"],

    duration=duration,

    segments=result["segments"],

    speakers=speakers,

    )

    @app.post("/detect-language", response_model=LanguageDetectionResponse)

    async def detect_language(

    file: UploadFile = File(..., description="Audio file"),

    ):

    """Detect the spoken language in an audio file."""

    suffix = Path(file.filename or "audio.wav").suffix or ".wav"

    with tempfile.NamedTemporaryFile(suffix=suffix, delete=True) as tmp:

    tmp.write(await file.read())

    tmp.flush()

    transcriber = get_transcriber(MODEL_SIZE)

    return transcriber.detect_language(tmp.name)

    @app.get("/health")

    async def health():

    return {"status": "ok", "model": MODEL_SIZE}

    Step 6: WebSocket Streaming

    Create app/ws_stream.py:

    import asyncio
    

    import json

    import tempfile

    import numpy as np

    from fastapi import WebSocket, WebSocketDisconnect

    from .transcriber import get_transcriber

    CHUNK_DURATION = 5 # seconds per chunk

    SAMPLE_RATE = 16000

    async def websocket_transcribe(websocket: WebSocket, model_size: str):

    """Real-time streaming transcription via WebSocket.

    Client sends raw PCM audio chunks (16kHz, 16-bit, mono).

    Server returns JSON transcription for each chunk.

    """

    await websocket.accept()

    transcriber = get_transcriber(model_size)

    buffer = bytearray()

    chunk_bytes = CHUNK_DURATION SAMPLE_RATE 2 # 16-bit = 2 bytes/sample

    try:

    while True:

    data = await websocket.receive_bytes()

    buffer.extend(data)

    while len(buffer) >= chunk_bytes:

    chunk = bytes(buffer[:chunk_bytes])

    buffer = buffer[chunk_bytes:]

    # Convert raw PCM to numpy

    audio_np = (

    np.frombuffer(chunk, dtype=np.int16).astype(np.float32) / 32768.0

    )

    # Save to temp file for Whisper

    with tempfile.NamedTemporaryFile(suffix=".wav", delete=True) as tmp:

    import soundfile as sf

    sf.write(tmp.name, audio_np, SAMPLE_RATE)

    result = transcriber.transcribe(tmp.name)

    await websocket.send_json({

    "text": result["text"],

    "language": result["language"],

    "is_final": True,

    })

    except WebSocketDisconnect:

    pass

    Add the WebSocket route to main.py:

    from .ws_stream import websocket_transcribe
    
    

    @app.websocket("/ws/transcribe")

    async def ws_transcribe(websocket: WebSocket):

    await websocket_transcribe(websocket, MODEL_SIZE)

    Step 7: Docker Deployment

    Create Dockerfile:

    FROM python:3.11-slim
    
    

    RUN apt-get update && apt-get install -y --no-install-recommends \

    ffmpeg \

    && rm -rf /var/lib/apt/lists/*

    WORKDIR /app

    COPY requirements.txt .

    RUN pip install --no-cache-dir -r requirements.txt

    COPY app/ app/

    EXPOSE 8000

    CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

    Create docker-compose.yml:

    version: "3.8"
    
    

    services:

    whisper-api:

    build: .

    ports:

    - "8000:8000"

    environment:

    - WHISPER_MODEL=base

    - HF_TOKEN=${HF_TOKEN:-}

    volumes:

    - whisper-cache:/root/.cache

    deploy:

    resources:

    reservations:

    devices:

    - driver: nvidia

    count: 1

    capabilities: [gpu]

    restart: unless-stopped

    volumes:

    whisper-cache:

    Create requirements.txt:

    openai-whisper>=20231117
    

    fastapi>=0.109.0

    uvicorn[standard]>=0.27.0

    python-multipart>=0.0.6

    pydub>=0.25.1

    torch>=2.1.0

    torchaudio>=2.1.0

    pyannote.audio>=3.1.0

    soundfile>=0.12.1

    websockets>=12.0

    numpy>=1.24.0

    Step 8: Testing the API

    Start the server:

    # Local
    

    uvicorn app.main:app --reload

    # Or with Docker

    docker compose up --build

    Test transcription with curl:

    # Basic transcription
    

    curl -X POST http://localhost:8000/transcribe \

    -F "file=@meeting_recording.mp3"

    # With options

    curl -X POST http://localhost:8000/transcribe \

    -F "file=@japanese_audio.wav" \

    -F "language=ja" \

    -F "word_timestamps=true"

    # Translate to English

    curl -X POST http://localhost:8000/transcribe \

    -F "file=@french_podcast.mp3" \

    -F "task=translate"

    # Language detection

    curl -X POST http://localhost:8000/detect-language \

    -F "file=@unknown_language.wav"

    Example response:

    {
    

    "text": "Hello, welcome to our weekly team meeting.",

    "language": "en",

    "duration": 2.34,

    "segments": [

    {

    "id": 0,

    "start": 0.0,

    "end": 3.52,

    "text": "Hello, welcome to our weekly team meeting.",

    "speaker": null,

    "words": []

    }

    ],

    "speakers": null

    }

    Step 9: Python Client

    import requests
    

    import json

    class WhisperClient:

    def __init__(self, base_url: str = "http://localhost:8000"):

    self.base_url = base_url

    def transcribe(

    self,

    audio_path: str,

    language: str | None = None,

    task: str = "transcribe",

    word_timestamps: bool = False,

    diarize: bool = False,

    ) -> dict:

    data = {

    "task": task,

    "word_timestamps": str(word_timestamps).lower(),

    "diarize": str(diarize).lower(),

    }

    if language:

    data["language"] = language

    with open(audio_path, "rb") as f:

    resp = requests.post(

    f"{self.base_url}/transcribe",

    files={"file": f},

    data=data,

    )

    resp.raise_for_status()

    return resp.json()

    def detect_language(self, audio_path: str) -> dict:

    with open(audio_path, "rb") as f:

    resp = requests.post(

    f"{self.base_url}/detect-language",

    files={"file": f},

    )

    resp.raise_for_status()

    return resp.json()

    # Usage

    client = WhisperClient()

    result = client.transcribe("meeting.mp3", diarize=True)

    for seg in result["segments"]:

    speaker = seg.get("speaker", "")

    print(f"[{seg['start']:.1f}s - {seg['end']:.1f}s] {speaker}: {seg['text']}")

    Model Size Comparison

    | Model | Parameters | VRAM | Relative Speed | English WER |

    |-------|-----------|------|----------------|-------------|

    | tiny | 39M | ~1 GB | ~32x | ~7.7% |

    | base | 74M | ~1 GB | ~16x | ~5.0% |

    | small | 244M | ~2 GB | ~6x | ~3.4% |

    | medium | 769M | ~5 GB | ~2x | ~2.9% |

    | large-v3 | 1550M | ~10 GB | 1x | ~2.0% |

    Performance Tips

    1. Use GPU: CUDA gives 10-30x speedup over CPU

    2. Choose the right model: base for speed, large-v3 for accuracy

    3. Batch processing: Queue multiple files with a task worker (Celery/RQ)

    4. FP16: Whisper uses FP16 on GPU by default — don't force FP32

    5. Trim silence: Pre-process to remove leading/trailing silence

    Conclusion

    You now have a self-hosted speech-to-text API that rivals commercial services. Whisper's multilingual capability makes it perfect for international applications, and the FastAPI wrapper gives you a production-ready interface with automatic documentation at /docs.

    Next steps:

  • Add Celery for async batch processing
  • Implement subtitle generation (SRT/VTT output)
  • Add a web UI for drag-and-drop transcription
  • Fine-tune Whisper on domain-specific audio