Tutorial

Feb 18, 2025

Creating a realtime AI Commentator with Cerebrium, LiveKit and Cartesia

Michael Louis

CEO & Founder

Live commentary has long been a staple of sports, entertainment, and broadcasting, bringing events to life with expert narration and real-time reactions. But what if AI could do the same—instantly analyzing plays, adding emotion, and delivering seamless commentary without human input?

In this article, we explore how AI models can power real-time sports analysis, movie trailer narration, and much more, all with natural speech and emotional control. By combining LiveKit for real-time video processing, Cerebrium for scalable AI infrastructure, and Cartesia for expressive voice synthesis, we’ve built an AI commentator that can react and narrate in real time. We’ll dive into the challenges, technical architecture, and what this means for the future of live broadcasting.

You can find the final source code here

Setup Cerebrium

If you don’t have a Cerebrium account, you can create one by signing up here and following the documentation here to get set up

In your IDE, run the following command to create the Cerebrium starter project: cerebrium init realtime-video-explainer. This creates two files:

  • main.py - The entrypoint file where our code lives

  • cerebrium.toml - A configuration file that contains all the build and environment settings ‍ Add the following pip packages near the bottom of your cerebrium.toml. This will be used in creating the deployment environment.

To start, we are going to be deploying the model that will interpret frames we send it and return text in a few 100 milliseconds. The model we will be using is openbmb/MiniCPM-o-2_6. The reason we are using this open-source model over Anthropic's Claude or OpenAI GPT4o is that the response times we get with our model are 50% faster and much more reliable since we aren't being rate limited or susceptible to other user requests.

Add the following to your cerebrium.toml:

[cerebrium.deployment]
name = "realtime-video-explainer"
python_version = "3.11"
docker_base_image_url = "debian:bookworm-slim"
disable_auth = false
include = ['./*', 'main.py', 'cerebrium.toml']
exclude = ['.*']

[cerebrium.hardware]
cpu = 4
memory = 16.0
compute = "ADA_L40"

[cerebrium.scaling]
min_replicas = 0
max_replicas = 5
cooldown = 30
replica_concurrency = 1
response_grace_period = 1800
scaling_metric = "concurrency_utilization"
scaling_target = 100
scaling_buffer = 2

[cerebrium.dependencies.pip]
numpy = "<2"
torch = "==2.2.0"
Pillow = "==10.1.0"
torchaudio = "==2.2.0"
torchvision = "==0.17.0"
transformers = "latest"
librosa = "==0.9.0"
soundfile = "==0.12.1"
vector-quantize-pytorch = "==1.18.5"
vocos = "==0.1.0"
decord = "latest"
moviepy = "latest"
"huggingface-hub[hf_transfer]" = "latest"
livekit-api = "latest"

Above we do quite a few things:

  • We define the hardware requirements for the model. We found L40s where a good trade off for the cost/performance

  • We define the scaling criteria which essentially says, 1 GPU can handle 1 request and that we should always have 2 extra GPUs running to the number of current requests so our users don’t experience any delay in model loading times.

  • Lastly we define all the required python packages in order to make the deployment work.

Next let's write the functionality for the model endpoint, add the following to your main.py

import os
import base64
from io import BytesIO
from huggingface_hub import login
from transformers import AutoModel, AutoTokenizer
from PIL import Image
import torch
from livekit import api
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

os.environ["HF_TRANSFER"] = "1"
login(token=os.environ["HF_TOKEN"])

model = AutoModel.from_pretrained(
    'openbmb/MiniCPM-o-2_6',
    trust_remote_code=True,
    attn_implementation='sdpa', # sdpa or flash_attention_2
    torch_dtype=torch.bfloat16,
    init_vision=True,
    init_audio=False,
    init_tts=False,
)


model = model.eval().cuda()
tokenizer = AutoTokenizer.from_pretrained('openbmb/MiniCPM-o-2_6', trust_remote_code=True)

async def run(images: list, question: str, conversation_history: list = None, max_tokens: int = 7, temperature: float = 0.7):
    if conversation_history is None:
        conversation_history = []

    pil_images = []
    for img_base64 in images:
        try:
            image_bytes = base64.b64decode(img_base64)
            pil_image = Image.open(BytesIO(image_bytes)).convert('RGB')
            pil_images.append(pil_image)
        except Exception as e:
            print(f"Error processing image: {str(e)}")
            continue
    
    # Create message with multiple images
    msgs = [
        *conversation_history,
        {
            'role': 'user', 
            'content': [*pil_images, question] 
        }
    ]

   
    res = model.chat(
        msgs=msgs,
        tokenizer=tokenizer,
        sampling=True,
        stream=True,
        temperature=temperature,  # Add temperature to control randomness
        top_p=0.9  # Add top_p to filter unlikely tokens
    )
    generated_text = ""
    for new_text in res:
        generated_text += new_text
        print(new_text, flush=True, end='')
        yield new_text

def create_token(room_name: str = "my-room"):
    token = api.AccessToken(os.getenv('LIVEKIT_API_KEY'), os.getenv('LIVEKIT_API_SECRET')) \
    .with_identity("identity") \
    .with_name("my name") \
    .with_grants(api.VideoGrants(
        room_join=True,
        room=room_name,
    ))
    return {"token": token.to_jwt()}

In the above:

  • we import the ENV variables to use to download the Hugging Face model and authorize the LikeKit endpoint - we will create this .env file in the next step.

  • We initialize the model and create a function that takes image frames, a question and conversation history as parameters. When we deploy the code, Cerebrium will turn this function into an autoscaling endpoint and those parameters we pass as JSON parameters.

  • Lastly, we create a endpoint to get a token to authenticate users joining our LiveKit room, we will need to use this for the frontend later. Might seem expensive to use on a GPU but since 1 GPU = 1 room and we create this before a user joins a room, there will be a container running anyway.

Now we will need to create the .env file in order to populate the variables above but first you need to create a LiveKit account. LiveKit is a platform that makes it easy to build real-time audio and video apps.

To start, we first need to create a LiveKit account. You can do this by signing up to an account here (they have a generous free tier). Once an account is created, run the following commands to install their cli package (Homebrew can be installed from here).

brew update && brew install livekit-cli

Once installed, let's authenticate the CLI with the newly created account, you can run the following in your cli

lk cloud auth

Then create a .env file with your LiveKit credentials. To get these credentials, you can go to the LiveKit dashboard, click on the Settings tab to get your project url. You must then go to the keys tab to create a new key and copy the API key as well as the secret.


Next let’s create the .env file with the following:

export LIVEKIT_API_KEY=xxx
export LIVEKIT_API_SECRET=xxxx
export HF_TOKEN=xxx

The Hugging face token you will need to get from your Hugging Face profile under Access tokens.

You can now run cerebrium deploy to deploy this app and once deployed you should see the deployment url that should look something like https://dashboard.cerebrium.ai/projects/p-xxxxxx/apps/p-xxxxxxx-realtime-video-explainer . We will need this for our app later.

Note: For the deployment we set scale buffer to 2 which means there will be at minimum 2 instances running. While we build out the rest of the app, you might want to set this to 0 which you can do through your cerebrium.toml and redeploy or just updated the value through your Cerebrium dashboard.

LiveKit Implementation

We set up LiveKit in the step above however we now need to setup the LiveKit workers that will stream video frames from the frontend to the Cerebrium endpoint and send audio frames back.

Let’s create a separate Cerebrium appus to handle the LiveKit deployment. The reason we create a separate app is:

  1. LiveKit runs works on a CPU

  2. LiveKit has different scaling parameters and so will autoscale at a different pace compared to the model we deployed above.

In a separate directory, run cerebrium init livekit-commentator . In your cerebrium.toml add the following:

[cerebrium.deployment]
name = "livekit-commentator"
python_version = "3.11"
docker_base_image_url = "debian:bookworm-slim"
disable_auth = false
include = ['./*', 'main.py', 'cerebrium.toml']
exclude = ['.*',]

[cerebrium.hardware]
cpu = 2
memory = 12.0
compute = "CPU"

[cerebrium.scaling]
min_replicas = 1
max_replicas = 5
cooldown = 30
replica_concurrency = 1
response_grace_period = 900
scaling_metric = "cpu_utilization"
scaling_target = 80

[cerebrium.dependencies.pip]
"livekit-agents" = ">=0.11.1"
"livekit-plugins-silero" = ">=0.7.3"
"python-dotenv" = "latest"
"aiofile" = "latest"
"fastapi" = "latest"
"uvicorn" = "latest"
"httpx" = "latest"
"pyaudio" = "latest"
cartesia = "latest"
"opencv-python" = "latest"

[cerebrium.dependencies.apt]
portaudio19-dev = "latest"
ffmpeg = "latest"

[cerebrium.runtime.custom]
port = 8600
entrypoint = ["python", "main.py", "start"]

In the above there are only a few key differences to note:

  • We are running this on CPU hardware

  • We set the scaling criteria to autoscale our application when the CPU reaches 80% utilization.

  • We are using a custom runtime (our own FastAPI app) since LiveKit is a continuously running pool of workers and we need to make Cerebrium aware of this.

Before I jump into the code for the main.py, I think its worth looking at how we implemented our solution, why we did it like that and the challenges we faced.

LiveKit sends us video frames every 50ms to interpret. The model deployed on Cerebrium takes roughly ~500ms to return what is happening in the frames and it takes Cartesia roughly 180ms to generate the voice frames that we then send back to the frontend. Essentially with every generation we are about 700ms behind the live stream, which is roughly in line with the average commentator.

As frames are sent via LiveKit, we add them to a queue to be processed by the model. Once we reach 3 frames, we then pop the frames off the queue and send them to the model. Once the model has returned, many items in the queue will be stale and so we need to disregard most of them and only use the latest. Also we do the audio generation of Cartesia in a separate thread so we can send frames to the Cerebrium endpoint in parallel and have a consistent stream of text/audio which allows us to shave off some time.

Let’s get into the code.

import asyncio
import logging
import os
from livekit import rtc
from collections import deque
import time
from dotenv import load_dotenv
import base64
import httpx
from queue import Queue
from threading import Thread, Lock
import pyaudio
from cartesia import Cartesia
import requests
import numpy as np
import cv2
from fastapi import FastAPI
import sys

from livekit.agents import JobContext, WorkerOptions, cli, llm
from livekit.agents.voice_assistant import VoiceAssistant
from livekit.plugins import silero
from livekit.agents.llm import ChatMessage, ChatImage
from livekit.agents import metrics, WorkerType

app = FastAPI()

logger = logging.getLogger("ai-commentator")
logger.setLevel(logging.INFO)
load_dotenv()

SPORT_CONTEXT = """You are an AI sports commentator specializing in basketball analysis. 
Your expertise includes:
- Understanding basketball gameplay and strategy
- Recognizing player movements and formations
- Identifying key moments in the game
- Providing engaging, real-time commentary

Keep your observations concise, natural, and focused on the most interesting aspects of the game.
Maintain an enthusiastic but professional tone, similar to professional sports broadcasters."""
sport_voice_id = "41534e16-2966-4c6b-9670-111411def906"
sport_emotional_controls = {
                            "speed": "fastest",
                            "emotion": ["positivity:highest", "surprise:highest"],
                        }
sport_question = """
            Provide the next micro-moment for this exciting game between the warriors and mavericks.
            • Must be a *single very short sentence* but make sure to create a suspenseful commentary.
            • Avoid reusing any previous sentence verbatim.
            • Do not mention player names or player numbers.
            • Keep it intense, but do not repeat yourself.
        """

MOVIE_CONTEXT = """You are a magical storyteller welcoming viewers into an enchanted forest world. Your tale begins with a peaceful woodland scene that sets the stage for adventure.

Your storytelling style:
- Paint vivid pictures of the forest's natural beauty
- Bring the gentle morning atmosphere to life
- Notice the small, delightful details of nature
- Build a sense of peaceful wonder
- Let the forest's magic unfold gradually

Remember:
- Keep each line brief (under 9 words)
- Start with the forest setting and atmosphere
- Introduce characters only when they appear
- Build anticipation through gentle observation
- Let the morning forest charm shine through

You're opening the door to a magical world - make the entrance enchanting!"""
movie_voice_id = "97f4b8fb-f2fe-444b-bb9a-c109783a857a"
movie_emotional_controls = {
    "speed": "normal",
    "emotion": ["positivity:highest", "surprise:highest",  "curiosity:highest"],
}
movie_question = """
            Provide the next micro-moment for this magical story of Bucks adventure in the enchanted forest.
            • Must be a *single short sentence* but make sure to create a suspenseful story.
            • Avoid reusing any previous sentence verbatim.
            • Keep it enchanting, but do not repeat yourself.
        """

MAX_QUEUE_SIZE = 10
audio_queue = asyncio.Queue(maxsize=MAX_QUEUE_SIZE)
audio_lock = Lock()
is_speaking = False

cartesia_client = Cartesia(api_key=os.environ.get("CARTESIA_API_KEY"))
ws = cartesia_client.tts.websocket()

# Set up Cartesia output format
output_format = {
    "container": "raw",
    "encoding": "pcm_f32le",
    "sample_rate": 44100,
}

conversation_history = []
MAX_HISTORY_LENGTH = 5

Above we do all the initializations with a few things to point out:

  • We showed how our prompt, voice and emotions changes for each example since we have a movie trailer and basketball game example. We found the commentary to be much better if it had some context about the video

  • We set up the Cartesia websocket instance and the queue that contains the text returned from the Cerebrium endpoint that needs to be processed by Cartesia.

  • Lastly, we store the conversation history but only up to a length of 5 since this affects the latency.

Let’s set up the function that calls the Cerebrium endpoint as well as the function that takes items off the queue and generates the voice frames for Cartesia

async def audio_worker():
    global is_speaking, audio_stream, audio_source
    while True:
        try:
            queue_item = await audio_queue.get()
            if queue_item is None:
                audio_queue.task_done()  # Mark the None sentinel as done
                break

            text, video_timestamp = queue_item
            with audio_lock:
                is_speaking = True
                try:
                    print(f"Cartesia processing video {video_timestamp:.2f}s: {text}")
                    audio_chunks = []
                    for output in ws.send(
                        model_id="sonic-english",
                        transcript=text,
                        voice_id=voice_id,
                        output_format=output_format,
                        stream=True,
                        _experimental_voice_controls=emotional_controls
                    ):
                        buffer = output["audio"]
                        audio_chunks.append(buffer)

                    full_buffer = b''.join(audio_chunks)
                    audio_data = np.frombuffer(full_buffer, dtype=np.float32)
                    audio_data = (audio_data * 32767).astype(np.int16)

                    audio_queue.task_done()

                    if audio_source:
                        audio_frame = rtc.AudioFrame(
                            data=audio_data.tobytes(),
                            samples_per_channel=len(audio_data),
                            sample_rate=44100,
                            num_channels=1
                        )
                        await audio_source.capture_frame(audio_frame)
                finally:
                    is_speaking = False
                
            if not audio_queue.empty():
                # Remove skipped items from conversation history
                while not audio_queue.empty():
                    try:
                        skipped_text, _ = await audio_queue.get()
                        if skipped_text in conversation_history:
                            conversation_history.remove(skipped_text)
                        audio_queue.task_done()
                    except asyncio.QueueEmpty:
                        break
        except Exception as e:
            print(f"Error in audio worker: {e}")
            audio_queue.task_done()

def format_conversation_history(history):
    """Convert history into OpenAI chat format"""
    formatted_history = [
        {
            "role": "system",
            "content": AGENT_CONTEXT
        }
    ]
    
    for message in history:
        formatted_history.append({
            "role": "assistant",
            "content": message
        })
    return formatted_history


def generate_commentary_with_api(frames_base64, conversation_history):
    global question

    API_URL = "https://api.cortex.cerebrium.ai/v4/p-xxxxxx/realtime-video-explainer/run"
    
    headers = {
        "Authorization": f"Bearer {os.environ.get('CEREBRIUM_API_KEY')}",
        "Content-Type": "application/json"
    }

    payload = {
        "images": frames_base64,
        "question": question,
        "temperature": 0.5,
        "conversation_history": conversation_history
    }

    current_sentence = ""
    recent_sentences = set(msg["content"] for msg in conversation_history[-5:] if isinstance(msg, dict) and "content" in msg)
    try:
        start_time = time.time()
        with requests.post(API_URL, json=payload, headers=headers, stream=True) as response:
            response.raise_for_status()
            first_word_returned = False
            for chunk in response.iter_content(chunk_size=None, decode_unicode=True):
                if chunk:
                    current_sentence += chunk
                    words = current_sentence.split()
                    
                    if not first_word_returned and words:
                        print(f"Time to first word: {time.time() - start_time:.2f} seconds")
                        first_word_returned = True
                    
                    if any(current_sentence.rstrip().endswith(p) for p in ['.', '!', '?']):
                        trimmed = current_sentence.strip()
                        
                        if trimmed.lower() not in (sent.lower() for sent in recent_sentences):
                            yield trimmed
                            recent_sentences.add(trimmed)
                            print(f"Yielded sentence: {trimmed}")
                            
                        current_sentence = ""
            
            trimmed = current_sentence.strip()
            if trimmed and trimmed.lower() not in (sent.lower() for sent in recent_sentences):
                yield trimmed
                
    except requests.RequestException as e:
        print(f"Error calling API: {e}")
        yield "Error generating commentary."

In the above we have the following:

  • We setup a simple function that makes a request to the Cerebrium endpoint that streams back the text. However, we only return it from the function when a full sentence has been constructed before we send to Cartesia for processing. This was because if not submitted together, Cartesia renders different voices and it affects your API usage with them.

  • Our audio_worker() function (which gets executed in a separate thread) takes items off the processing queue and sends it to Cartesia with our voice id and emotional controls. We then take the response and convert it into the appropriate format before sending it back to Livekit.

We then need to create two functions that handle the processing of frames that are streamed from LiveKit. Add the following to your main.py

async def handle_video_track(track: rtc.Track):
        frames = []
        start_time = time.time()
        last_process_time = time.time() 

        video_stream = rtc.VideoStream(track)
        try:
            async for event in video_stream:
                current_time = time.time()                
                # Skip frame processing if audio queue is not empty
                if not audio_queue.empty():
                    continue
                
                # Collect frame every 100ms
                if (current_time - start_time) >= 0.1:
                    frames.append(event.frame)
                    start_time = current_time
                
                # Keep only latest frame
                if len(frames) > 1:
                    frames = frames[-1:]
                
                # Process frames no more frequently than every 2 seconds
                # and only if we're not currently speaking
                if (len(frames) >= 1 and 
                    (current_time - last_process_time) >= 2.0 and 
                    not is_speaking and 
                    audio_queue.empty()):
                    
                    logger.info(f"Processing frame at {current_time}")
                    await process_frames(frames)
                    frames = []
                    last_process_time = current_time
                    
                    # Clear any accumulated frames to prevent backlog
                    frames = []

        except Exception as e:
            logger.error(f"Error processing video stream: {e}")
        finally:
            await video_stream.aclose()

async def process_frames(frames):
        """Process the collected frames"""

        global conversation_history

        logger.info(f"Processing batch of {len(frames)} frames")
        print(f"Processing batch of {len(frames)} frames")
        encoded_frames = []

        for frame in frames:
                       
            rgb_frame = frame.convert(rtc.video_frame.proto_video.VideoBufferType.RGB24)
            frame_data = rgb_frame.data
            frame_array = np.frombuffer(frame_data, dtype=np.uint8)
            
            frame_array = frame_array.reshape((rgb_frame.height, rgb_frame.width, 3))
                
            # Additional check for overall brightness
            mean_value = np.mean(frame_array)
            if mean_value < 20:  # Increased threshold
                print(f"Skipping dark frame (mean value: {mean_value:.2f})")
                continue
            
            _, buffer = cv2.imencode('.jpg', frame_array)
            
            frame_b64 = base64.b64encode(buffer).decode('utf-8')
            encoded_frames.append(frame_b64)
        
        commentary_generator = generate_commentary_with_api(encoded_frames, format_conversation_history(conversation_history))
        timestamp = time.time()
        
        for sentence in commentary_generator:
            conversation_history.append(sentence)
            # Keep only the most recent entries
            if len(conversation_history) > MAX_HISTORY_LENGTH:
                conversation_history = conversation_history[-MAX_HISTORY_LENGTH:]
                
            if audio_queue.full():
                try:
                    audio_queue.get_nowait()
                    audio_queue.task_done()
                    logger.info("Dropped the oldest sentence to make room for the new one.")
                except asyncio.QueueEmpty:
                    pass 
        
            await audio_queue.put((sentence, timestamp))

In the above code we do the following:

  • We attached the handle_video_track() function to the video stream from LiveKit - It contains the logic of when to kick off the commentary generation. It helps us limit the number of frames we capture before processing which affects latency and also makes sure to separate frames with enough time to get the model more to work with.

  • The process_frames() function handles the frame conversion. We check if the frames are of good quality - initially when the stream starts frames are black so we make sure to exclude these. We then send these frames off to Cerebrium along with the conversation history and wait for Cartesia to first generate audio that we can then continue with the cycle.

Now let’s put this all together with the LiveKit workers

async def entrypoint(ctx: JobContext):
    global audio_source, audio_track, AGENT_CONTEXT, voice_id, emotional_controls

    if ctx.room.name.lower().startswith('movie'):
        AGENT_CONTEXT = MOVIE_CONTEXT
        voice_id = movie_voice_id
        emotional_controls = movie_emotional_controls
    else:  # Default to basketball/sports context
        AGENT_CONTEXT = SPORT_CONTEXT
        voice_id = sport_voice_id
        emotional_controls = sport_emotional_controls


    room = rtc.Room()

    audio_task = asyncio.create_task(audio_worker())

    @ctx.room.on("track_subscribed")
    def on_track_subscribed(
        track: rtc.Track,
        publication: rtc.TrackPublication,
        participant: rtc.RemoteParticipant,
    ):
        
        if track.kind == rtc.TrackKind.KIND_VIDEO:
            asyncio.create_task(handle_video_track(track))


    await ctx.connect()
    logger.info("Connected to the room initialized.")
    # Create audio source and track
    audio_source = rtc.AudioSource(sample_rate=44100, num_channels=1)
    audio_track = rtc.LocalAudioTrack.create_audio_track("ai-voice", audio_source)
    
    # Publish the audio track to the room
    await ctx.room.local_participant.publish_track(audio_track)

if __name__ == "__main__":
    # Configure the worker options with the revised entrypoint
    worker_options = WorkerOptions(entrypoint_fnc=entrypoint)
    cli.run_app(worker_options)

In the above code snippet, we are just bringing everything together interms of:

  • Creating the audio worker on a separate thread

  • Subscribing the handle_video_track() function to the video stream

  • Creating an audio source which will stream audio back to our LiveKit instance so the user can hear it on the webpage.

Lastly before we deploy, let's add all the secrets to our .env file and then subsequently add to your Cerebrium Dashboard secrets.

LIVEKIT_API_KEY=xxxx
LIVEKIT_API_SECRET=xxx
LIVEKIT_URL=xxxx
CARTESIA_API_KEY=xxxx
CEREBRIUM_API_KEY=xxxx

The LiveKit values you should have from the previous step and the Cerebrium and Cartesia keys you should be able to get from your respective dashboards.

Now in order to deploy this, run cerebrium deploy , you will need to run this as min 1 since the workers need to be active the entire time. Voila your instances should be running.

Frontend Setup

In order to generate a demo for this frontend I used bolt.new. The prompt I used to generate this was:

Can you build me a landing page with an explainer of the project (placeholder for now) and then links in the footer to go to the blog post and source code. Then there must be a button that takes me to a seperate page. It should have a main video view with a carousel of 3 videos under it

I made a few tweaks after this but it should get you 95% of the way. If not you can checkout the frontend Github repo here. I am not going to go into the full implementation of the frontend but just point out a few setup snippets you should do in order to get it to interact with your LiveKit instance.

useEffect(() => {
    if (roomName) {
      initializeRoom();
    }
    return () => {
      roomRef.current?.disconnect();
    };
  }, [roomName]);

  const initializeRoom = async () => {
    const room = new Room({
      adaptiveStream: true,
      dynacast: true,
      videoCaptureDefaults: {
        resolution: VideoPresets.h720.resolution,
      },
      reconnect: true,
      maxRetries: 3,
    });

    room.on(RoomEvent.TrackSubscribed, (track, publication, participant) => {
      console.log('Track subscribed:', track.kind);
      if (track.kind === 'audio') {
        const audioElement = track.attach();
        audioElement.volume = 1.0;  // Ensure volume is up
        
        // Store the audio element so we can control it later
        audioElements.current.push(audioElement);
      }
    });
    

    try {

      const apiUrl = import.meta.env.VITE_API_URL;
      const authToken = import.meta.env.VITE_AUTH_TOKEN;
      const livekitUrl = import.meta.env.VITE_LIVEKIT_WS_URL;

      // Get token from your backend
      const response = await fetch(apiUrl, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'Authorization': `Bearer ${authToken}`,
          'Origin': window.location.origin,
        },
        body: JSON.stringify({
          room_name: roomName,
        }),
      });

      if (!response.ok) {
        throw new Error(`Token request failed: ${response.statusText}`);
      }

      const { result: { token } } = await response.json();
      
      // For local development, use ws:// instead of wss:// if not on HTTPS
      
      console.log('Connecting to LiveKit server:', livekitUrl);
      
      await room.connect(livekitUrl, token, {
        autoSubscribe: true,
        rtcConfig: {
          iceTransportPolicy: 'all',
          iceServers: []
        }
      });

      console.log('Connected to room:', room.name);
      roomRef.current = room;

    } catch (error) {
      console.error('Connection failed:', error);
      // Add more detailed error logging
      if (error instanceof Error) {
        console.error('Error details:', error.message);
      }
    }
  };

In the example above, we send a request to the Cerebrium endpoint to retrieve a LiveKit token, which is then used to authenticate and initialize our LiveKit room. Below we then write the logic to start streaming the video to our LiveKit worker when the user clicks the play button. We keep reference to all the tracks in case the user pauses or moves to a different video so we can kill the state across all of them.

const handleVideoPlayPause = async (isPlaying: boolean) => {
    console.log(isPlaying ? 'Video playing' : 'Video paused');
    setIsPlaying(isPlaying);

    if (roomRef.current) {
      if (isPlaying && videoRef.current) {
        try {

          toggleAudio(true);

          const mediaStream = videoRef.current.captureStream();
          const videoTrack = mediaStream.getVideoTracks()[0];
          const audioTrack = mediaStream.getAudioTracks()[0];
          // Store published tracks for cleanup
          publishedTracksRef.current = [];
          
          if (videoTrack) {
            console.log('publishing video track');
            const publishedVideo = await roomRef.current.localParticipant.publishTrack(videoTrack, {
              source: Track.Source.Unknown,
              stopMicTrackOnMute: true,
            });
            publishedTracksRef.current.push(publishedVideo);
          }
          if (audioTrack) {
            console.log('publishing audio track');
            const publishedAudio = await roomRef.current.localParticipant.publishTrack(audioTrack, {
              source: Track.Source.Unknown,
              name: 'audio-playback',
              dtx: true,
              forceStereo: true,
              red: true,
              stopMicTrackOnMute: false
            });
            publishedTracksRef.current.push(publishedAudio);
          }
        } catch (error) {
          console.error('Error publishing video:', error);
        }
      } else {
        // Cleanup published tracks when video is paused
        console.log(roomRef.current?.localParticipant.trackPublications);
        for (const publication of publishedTracksRef.current) {
          try {
            console.log('Attempting to unpublish track with SID:', publication.trackSid);
            if (publication.track?.kind === 'audio') {
              toggleAudio(false);
            }
            await roomRef.current.localParticipant.unpublishTrack(publication.track);
          } catch (error) {
            console.warn('Error unpublishing track:', error);
          }
        }
        publishedTracksRef.current = [];
      }
    }
  };

To get this frontend to work you will need to create your own .env file with the following values:

VITE_API_URL=https://api.cortex.cerebrium.ai/v4/p-xxxxxx/realtime-video-explainer/create_token
VITE_AUTH_TOKEN=<CEREBRIUM_AUTH_TOKEN>

To run the frontend you can simply run npm run dev to start it locally otherwise you can clone the repository and deploy on vercel by just attaching your repository.

Further Improvements

  • An ambitious project would be to identify moments of high tension and inject special effects into the frames as keywords get triggered such as scoring a basket.

  • If you watch a basketball game, they include background noise of the crowd and sneaker noises which adds to the atmosphere while watching. It should be possible to add this to the audio output

Conclusion

The fusion of AI technologies like LiveKit, Cerebrium, and Cartesia opens up exciting possibilities for real-time commentary that rivals human broadcasters. By leveraging video frame analysis, scalable AI infrastructure, and expressive voice synthesis, we’ve demonstrated how AI can deliver insightful, emotionally engaging narration with minimal latency. While there are still challenges to overcome—such as optimizing delays and enhancing the realism of the audio experience—the progress so far is a promising glimpse into the future of live broadcasting.

Looking ahead, the potential for further innovation is immense. From dynamically reacting to high-tension moments with special effects to incorporating ambient sounds like crowd noise and sneaker squeaks, AI commentators can become even more immersive and lifelike. As these technologies continue to evolve, they could revolutionize how we experience sports, entertainment, and beyond—bringing audiences closer to the action than ever before.

© 2024 Cerebrium, Inc.