Tutorial

Jul 21, 2024

Creating a realtime RAG voice agent

Michael Louis

Founder

Educational resources are highly accessible today. Anyone, anywhere can watch online content from some of the most renowned institutions and individuals in the world, as long as they have a internet connection. However, something has been missing from this loop. What if you don't understand something in the lecture? How do you ask a video questions? Well, what if you could do that very thing!

In this tutorial, we build our very own personalised tutor embodying Andrej Karpathy, the previous Director of AI at Tesla and the former Head of AI at OpenAI. We selected a portion of his YouTube videos to train the tutor with. While you watch, you can ask him about various concepts in the lecture, and have him explain them to you in Andrej's original voice!

You can find the final code example here and a demo here

Tutorial:

Cerebrium will be hosting the entire application, so if you don’t have a Cerebrium account you can create one by signing up here (we give you $30 in free credits). Follow the documentation here to get setup.

Data processing

In order to get started, we need to create our Cerebrium project.

cerebrium init voice-educator
  • main.py - The entrypoint file where our code lives.

  • cerebrium.toml - The configuration file that contains all our build and environment settings.

We first need to start with our data processing task - downloading Andrej’s Youtube videos, chunking and embedding them. Since this will be a once off task and not part of our live application, create a new Python file called data-processing.py.

Following good practice, let us create a Python virtual environment to make sure our our application dependencies are consistent. We will also be using environment variables, so install the python-dotenv Pip package to enable this:

python -m venv educator
source educator/bin/activate
pip install python-dotenv

You should be in your new environment!

In order to create the RAG element of our application, we need to download Andrej’s Youtube videos, transcribe them and finally upload them to our vector database. This will allow the application to retrieve the necessary transcriptions it needs in order to give the embedded LLM the correct context.

  1. To download the videos locally, we can use the pytube library. In this tutorial we need to only download the audio of the video. Run pip install pytube, and create the following script:


    from pytube import YouTube
    import os 
    
    def download_videos(link: str, download_path: str):
    
        if not os.path.exists(download_path):
            os.makedirs(download_path)
    
        yt = YouTube(link)
    
        audio_download = yt.streams.get_audio_only()
    
        print("Downloading Audio...")
        audio_download.download(filename=f"{yt.title}.mp3", output_path = download_path)
        download_file_path = f"{download_path}/{yt.title}.mp3"
    
        return yt.title, download_file_path,
    
    


  2. Next we need to transcribe the audio files into text. We do this using the Deegram API. You can signup for a Deepgram account here (they have a generous free tier). You can then create an API key right from the initial screen. Run pip install deepgram httpx. Create a .env file in your project root and add your API key to it - we named ours DEEPGRAM_API_KEY.


    We can then create the following code to transcribe the audio file and return the text.


    from deepgram import (
        DeepgramClient,
        PrerecordedOptions,
        FileSource,
    )
    import httpx
    from dotenv import load_dotenv
    
    # Load environment variables from .env file
    load_dotenv()
    
    def transcribe_file(audio_file: str):
        print('transcribing')
        try:
            # STEP 1 Create a Deepgram client using the API key
            deepgram = DeepgramClient(os.getenv("DEEPGRAM_API_KEY"))
    
            with open(audio_file, "rb") as file:
                buffer_data = file.read()
    
            payload: FileSource = {
                "buffer": buffer_data,
            }
    
            #STEP 2: Configure Deepgram options for audio analysis
            options = PrerecordedOptions(
                model="nova-2",
                smart_format=True,
            )
    
            # STEP 3: Call the transcribe_file method with the text payload and options
            response = deepgram.listen.prerecorded.v("1").transcribe_file(payload, options, timeout=httpx.Timeout(300.0, connect=10.0))
            return response.results.channels[0].alternatives[0].transcript
    
        except Exception as e:
            print(f"Exception: {e}")


  3. Next, we need to embed the text in a vector database so that our application can easily retrieve the context our LLM needs to respond effectively. There are many articles on the different models and strategies to choose for this task. We recommend looking at the tool here to see what might suit your use case.



    For simplicity we will use OpenAI for embeddings and Pinecone as our vector store. You can signup for an OpenAI account here and a PineCone account here. We will use the Langchain framework in order to create our RAG application and so we will also use it to chunk, upload, embed and store our transcribed text


    You will need to create a index in Pinecone that we will upload our embeddings to. Since we are using the OpenAI embeddings model you must set the dimension to 1536 and we will be using the cosine metric to measure similarity.


    Run pip install -qU langchain-text-splitters langchain_openai langchain_pinecone. You will then need to fetch API Keys from OpenAI and PineCone and add then to your .env file. We called ours OPENAI_API_KEY and PINECONE_API_KEY respectively.




  4. You can then implement the code below:

from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_pinecone import PineconeVectorStore
from langchain_openai import OpenAIEmbeddings


def embed_text(text: str):

    print('Embedding')

    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=800,
        chunk_overlap=200,
        length_function=len,
        is_separator_regex=False,
    )
    split_text = text_splitter.split_text(text)
    return split_text 
    

def save_embeddings_to_db(title: str, url: str, docs: [str]):
    
    index_name = "andrej-youtube"
    embeddings = OpenAIEmbeddings()
    # Connect to Pinecone index and insert the chunked docs as contents
    PineconeVectorStore.from_texts(docs, embeddings, index_name=index_name)

The above code takes our transcribed text, chunks it based on the values we set in terms of size and overlap, and then uploads it to our index in Pinecone.

Lastly, let us bring this all together:

if __name__ == "__main__":

    video_links = [
        "https://www.youtube.com/watch?v=l8pRSuU81PU",
        "https://www.youtube.com/watch?v=zduSFxRajkE",
        "https://www.youtube.com/watch?v=zjkBMFhNj_g",
        "https://www.youtube.com/watch?v=kCc8FmEb1nY"
    ]

    for link in video_links:
        title, download_path = download_videos(link, "./videos")
        texts = transcribe_file(download_path)
        docs = embed_text(texts)
        save_embeddings_to_db(title, link, docs=docs)

You can then run the script with: python data-processing.py. You should see some logs and it will take about 5 minutes to execute.

You should then be able to navigate to your index in Pinecone and see some records.

Voice Agent

We previously did a tutorial on building a voice agent on Cerebrium using Deepgram, Daily and the Pipecat framework. If you haven’t read through it, we recommend you do as here we are only going to talk about the changes we made as well as how we implemented RAG with Pipecat.

What is different about this implementation is we will be using an external API for our LLM instead of a local model. We did this for two reasons:

  1. To showcase how you can utilize external LLMs if you require more performant models such as OpenAI or Anthropic. This does come with a latency trade-off (~800ms vs ~100ms locally).

  2. How you can make your implementation run on a CPU instead of a H100 for similarly performant models so you don’t hit capacity/cost constraints.

I recommend you clone the repository at this point and I will explain the code changes and only show snippets of changes.

To start, let us upload our secrets from our .env file to our Cerebrium account so we can use them in our application. Navigate to “Secrets” in your Cerebrium Dashboard and upload your .env file - you should see your values populate. We will reference these values later in the tutorial.

In your cerebrium.toml file, make sure you have the following set:

[cerebrium.deployment]
name = "educator"
python_version = "3.11"
include = "[./*, main.py, cerebrium.toml]"
exclude = "[.*]"
shell_commands = []
docker_base_image_url="prod.registry/daily:latest"

[cerebrium.hardware]
cpu = 2
memory = 8.0
compute = "CPU"
provider = "aws"
region = "us-east-1"

[cerebrium.scaling]
min_replicas = 0
max_replicas = 5
cooldown = 60

[cerebrium.dependancies.pip]
deepgram-sdk = "latest"
"pipecat-ai[silero, daily, openai, deepgram, elevenlabs]" = "latest"
aiohttp = ">=3.9.4"
torchaudio = ">=2.3.0"
channels = ">=4.0.0"
requests = "==2.32.2"
openai = "latest"
langchain = "latest"
langchain_community = "latest"
langchain_openai = "latest"
langchain_pinecone = "latest"
pinecone = "latest"

Here we are:

  • Setting our Base Docker image to our Daily image that contains the Deepgram model locally. This makes the STT conversion extremely quick since it happens locally than rather over the network.

  • We set our compute type to CPU since we are calling an API for our LLM and do not need a GPU.

  • We listed the pip packages we need for our application


Voice Cloning

In order to make our demo more realistic, we thought we would clone Andrej’s voice using ElevenLabs so that the voice speaking on the video sounds less robotic. You can sign up for a ElevenLabs account here. It comes with a generous free tier, however if you would like to do the voice cloning you need to upgrade to their Starter plan which costs $5.

In order to clone a voice on ElevenLabs, you need to upload a voice recording that is less than 4MB in size. Since we already downloaded audio files in our data-processing step we just need to cut them down using a platform like Clideo (its free). Once you have cut down one of the files, you can upload it to ElevenLabs - you should then get back a voiceID that we will use later in our application.

You need to upload your ElevenLabs API key to your Secrets! We called ours ELEVENLABS_API_KEY - we will use this in the next step.

Pipecat

Below is our base Pipecat implementation with a few changes:

  • We use ElevenLabs for our TTS element using the voice we cloned in the previous step. We upated the voiceID to that which ElevenLabs specified.

  • We implement the Pipecat Langchain integration to create a conversation agent that can remember the history of our conversation. We will edit this section of the code in the next step.

    from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
    from langchain.chains import create_history_aware_retriever, create_retrieval_chain
    from langchain_community.chat_message_histories import ChatMessageHistory
    from langchain_core.chat_history import BaseChatMessageHistory
    from langchain_core.runnables.history import RunnableWithMessageHistory
    from langchain_openai import ChatOpenAI
    
    message_store = {}
    
    def get_session_history(session_id: str) -> BaseChatMessageHistory:
        if session_id not in message_store:
            message_store[session_id] = ChatMessageHistory()
        return message_store[session_id]
    
    async def main(room_url: str, token: str):
        
        async with aiohttp.ClientSession() as session:
            transport = DailyTransport(
                room_url,
                token,
                "Andrej Karpathy",
                DailyParams(
                    audio_out_enabled=True,
                    transcription_enabled=True,
                    vad_enabled=True,
                    vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)),
                )
            )
    
            stt = DeepgramSTTService(
                 name="STT",
                 api_key=None,
                 url='ws://127.0.0.1:8082/v1/listen'
            )
    
            tts = ElevenLabsTTSService(
                aiohttp_session=session,
                api_key="49a9831645c1cf792f99eb6b73c77f1f",#get_secret("ELEVENLABS_API_KEY"),
                voice_id="uGLvhQYfq0IUmSfqitRE",#get_secret("ELEVENLABS_VOICE_ID"),
            )
    
    				##We are about to replace this langchain 
            prompt = ChatPromptTemplate.from_messages(
                [
                    ("system",
                     "Be nice and helpful. Answer very briefly and without special characters like `#` or `*`. "
                     "Your response will be synthesized to voice and those characters will create unnatural sounds.",
                     ),
                    MessagesPlaceholder("chat_history"),
                    ("human", "{input}"),
                ])
            chain = prompt | ChatOpenAI(model="gpt-4o", temperature=0.7)
            history_chain = RunnableWithMessageHistory(
                chain,
                get_session_history,
                history_messages_key="chat_history",
                input_messages_key="input")
            lc = LangchainProcessor(history_chain)
    				##end of Langchain segment
    
            avt = AudioVolumeTimer()
            tl = TranscriptionTimingLogger(avt)
    
            tma_in = LLMUserResponseAggregator()
            tma_out = LLMAssistantResponseAggregator()
    
            pipeline = Pipeline([
                transport.input(),   # Transport user input
                avt,  # Audio volume timer
                stt,  # Speech-to-text
                tl,  # Transcription timing logger
                tma_in,              # User responses
                lc,                 # LLM
                tts,                 # TTS
                transport.output(),  # Transport bot output
                tma_out,             # Assistant spoken responses
            ])
    
            task = PipelineTask(pipeline, PipelineParams(
                allow_interruptions=True,
                enable_metrics=True,
                report_only_initial_ttfb=True,
            ))

Langchain RAG Pipeline

In order to create a RAG pipeline with Langchain, we can just create a retrieval chain. This requires:

  • An LLM which in this case will be OpenAI’s new GPT-4o-mini model.

  • We will use OpenAI for our embeddings and Pinecone for our vector store - just link in the data processing step.

  • We with then use RunnableWithMessageHistory from Langchain in order to use our message history along with the retrieval for our LLM context.

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
vectorstore = PineconeVectorStore.from_existing_index(
    "andrej-youtube", OpenAIEmbeddings()
)
retriever = vectorstore.as_retriever()
question_answer_chain = create_stuff_documents_chain(llm, answer_prompt)
rag_chain =  create_retrieval_chain(retriever, question_answer_chain)

history_chain = RunnableWithMessageHistory(
    rag_chain,
    get_session_history,
    history_messages_key="chat_history",
    input_messages_key="input",
    output_messages_key="answer")

Above you can implement a history_aware_retriever from Langchain which generates a new prompt based on your chat history, vector store and original question. We found this added too much latency and didn’t make a big enough impact on the results.

Langchain’s retrieval chain creates a dict that provides its response in the “answer” key shown by the output_messages_key parameter above. We therefore need to extend the Pipecat Langchain processor to cater for this. In helpers.py add the following code:

from pipecat.processors.frameworks.langchain import LangchainProcessor

from langchain_core.messages import AIMessageChunk
from langchain_core.runnables import Runnable

from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.frames.frames import (
    Frame,
    AudioRawFrame,
    InterimTranscriptionFrame,
    TranscriptionFrame,
    TextFrame,
    StartInterruptionFrame,
    LLMFullResponseStartFrame,
    LLMFullResponseEndFrame,
    LLMResponseEndFrame,
    LLMResponseStartFrame,
    TTSStoppedFrame,
    MetricsFrame
)

class LangchainRAGProcessor(LangchainProcessor):
    def __init__(self, chain: Runnable, transcript_key: str = "input"):
        super().__init__(chain, transcript_key)  
        self._chain = chain
        self._transcript_key = transcript_key

    @staticmethod
    def __get_token_value(text: Union[str, AIMessageChunk]) -> str:
        match text:
            case str():
                return text
            case AIMessageChunk():
                return text.content
            case dict() as d if 'answer' in d:
                return d['answer']
            case _:
                return ""
            
    async def _ainvoke(self, text: str):
        logger.debug(f"Invoking chain with {text}")
        targetPhrases = [
          "you can continue with the lecture",
          "continue with the lecture",
          "you can continue with lecture",
          "continue with lecture",
          "play the video",
          "continue with the video"
        ]

        ##Simple fuzzy matching by checking if the target phrase is included in the transcript text
        matchFound = any(phrase in text for phrase in targetPhrases)
        if matchFound:
            print("Fuzzy match found for the phrase: 'You can continue with the lecture'")
            return
        
        await self.push_frame(LLMFullResponseStartFrame())
        try:
            async for token in self._chain.astream(
                {self._transcript_key: text},
                config={"configurable": {"session_id": self._participant_id}},
            ):
                await self.push_frame(LLMResponseStartFrame())
                await self.push_frame(TextFrame(self.__get_token_value(token)))
                await self.push_frame(LLMResponseEndFrame())
        except GeneratorExit:
            logger.warning(f"{self} generator was closed prematurely")
        except Exception as e:
            logger.exception(f"{self} an unknown error occurred: {e}")
        finally:
            await self.push_frame(LLMFullResponseEndFrame())

There are three things to notice here:

  • We extend the LangchainProcessor from Pipecat since it already contains a lot of the functionality we need - I simply edited some of the functions.

  • In __get_token_value we look for when the AIMessageChuck contains the dict object “answer” since that is from our retrieval chain - we return this value in that case.

  • In our _ainvoke method (essentially calling Langchain invoke) we do fuzzy matching on what the user said in order to pick up when they say we can continue with playing the video. We do this to stop the message going to the LLM and getting a response. You could do this with function calling but for simplicity of the demo I did this with fuzzy matching.

Now in your main.py you can add the following under your history_chain variable

  lc = LangchainRAGProcessor(chain=history_chain)

Deploy to Cerebrium

To deploy this application to Cerebrium you can simply run the command: cerebrium deploy in your terminal.

If it deployed successfully, you should see something like this:

We will add these endpoints to our frontend interface.

Connect frontend

We created a public fork of the PipeCat frontend in order to show you a demo of this application. You can clone the repo here.

Follow the instructions in the README.md and then populate the following variables in your .env.development.local

VITE_SERVER_URL=https://api.cortex.cerebrium.ai/v4/p-xxxxx/<APP_NAME> #This is the base url. Do not include the function names VITE_SERVER_AUTH= #This is the JWT token you can get from the API Keys section of your Cerebrium Dashboard.

You can now run yarn dev and go to the url: **http://localhost:5173/** to test your application!

Conclusion

This tutorial shows how you can build a scalable personalized tutor using Cerebrium and a variety of complimentary services (Daily, Deepgram, ElevenLabs, OpenAI etc). Combining RAG and voice unlocks a myriad of applications and since it's fully customizable you can make your own tradeoffs in terms of latency, cost and accuracy.

In a space that is moving as quickly as AI, it is our job at Cerebrium to constantly innovate and think about what the future of industries might look like, so we can make sure we are in the best position to support. With our entire team being from South Africa, education has always been an important topic, and so we thought about what impact we could have on a industry as important as this. As Nelson Mandela once famously said: “Education is the most powerful tool we can use to change the world.”

Tag us as @cerebrimai so we can see what you build and please feel free to ask questions/send feedback to us on Slack or Discord communities

© 2024 Cerebrium, Inc.