Pydantic AI Streamlit interface

I was going to try to test my agent with a streamlit interface but pydantic ai is oddly difficult to work with in terms of responses. Has anyone found a good reference?

1 Like

@kai-feinberg I actually have the PERFECT thing for you! I recently created a Streamlit interface to work with the GitHub agent I’ve been building on my channel. I’ll paste the code in here. It takes care of a ton including:

  • Managing conversation history in the Streamlit state
  • Converting the Pydantic AI message format into what gets displayed in Streamlit
  • Streaming the response to the frontend instead of just dumping the final response all at once
  • Storing tool calls and responses in the conversation history

You’ll just have to change the GitHub agent import and usage to your agent!

Code:

from __future__ import annotations
from typing import Literal, TypedDict
import asyncio
import os

import streamlit as st
import httpx
import json
import logfire

# Import all the message part classes
from pydantic_ai.messages import (
    ModelMessage,
    ModelRequest,
    ModelResponse,
    SystemPromptPart,
    UserPromptPart,
    TextPart,
    ToolCallPart,
    ToolReturnPart,
    RetryPromptPart,
    ModelMessagesTypeAdapter
)
from github_agent_ai import github_agent, Deps

# Load environment variables if needed
from dotenv import load_dotenv
load_dotenv()

# Configure logfire to suppress warnings (optional)
logfire.configure(send_to_logfire='never')

class ChatMessage(TypedDict):
    """Format of messages sent to the browser/API."""

    role: Literal['user', 'model']
    timestamp: str
    content: str


def display_message_part(part):
    """
    Display a single part of a message in the Streamlit UI.
    Customize how you display system prompts, user prompts,
    tool calls, tool returns, etc.
    """
    # system-prompt
    if part.part_kind == 'system-prompt':
        with st.chat_message("system"):
            st.markdown(f"**System**: {part.content}")
    # user-prompt
    elif part.part_kind == 'user-prompt':
        with st.chat_message("user"):
            st.markdown(part.content)
    # text
    elif part.part_kind == 'text':
        with st.chat_message("assistant"):
            st.markdown(part.content)          


async def run_agent_with_streaming(user_input: str, github_url: str | None = None):
    """
    Run the agent with streaming text for the user_input prompt,
    while maintaining the entire conversation in `st.session_state.messages`.
    """
    # Prepare dependencies
    openai_api_key = os.getenv('OPENAI_API_KEY')
    github_token = os.getenv('GITHUB_TOKEN')
    deps = Deps(
        client=httpx.AsyncClient(),
        openai_api_key=openai_api_key,
        github_token=github_token,
    )

    # If you want to prepend the GitHub URL to the user's prompt:
    if github_url:
        user_input = f"For the GitHub repository {github_url}, {user_input}"

    # Run the agent in a stream
    try:
        async with github_agent.run_stream(
            user_input,
            deps=deps,
            message_history= st.session_state.messages[:-1],  # pass entire conversation so far
        ) as result:
            # We'll gather partial text to show incrementally
            partial_text = ""
            message_placeholder = st.empty()

            # Render partial text as it arrives
            async for chunk in result.stream_text(delta=True):
                partial_text += chunk
                message_placeholder.markdown(partial_text)

            # Now that the stream is finished, we have a final result.
            # Add new messages from this run, excluding user-prompt messages
            filtered_messages = [msg for msg in result.new_messages() 
                               if not (hasattr(msg, 'parts') and 
                                     any(part.part_kind == 'user-prompt' for part in msg.parts))]
            st.session_state.messages.extend(filtered_messages)

            # Add the final response to the messages
            st.session_state.messages.append(
                ModelResponse(parts=[TextPart(content=partial_text)])
            )
    finally:
        await deps.client.aclose()


async def main():
    st.title("GitHub Repository Analyzer")
    st.write("Ask questions about any GitHub repository!")

    # Let the user provide a GitHub URL
    github_url = st.text_input("Enter GitHub Repository URL:", key="github_url")

    # Initialize chat history in session state if not present
    if "messages" not in st.session_state:
        st.session_state.messages = []

    # Display all messages from the conversation so far
    # Each message is either a ModelRequest or ModelResponse.
    # We iterate over their parts to decide how to display them.
    for msg in st.session_state.messages:
        if isinstance(msg, ModelRequest) or isinstance(msg, ModelResponse):
            for part in msg.parts:
                display_message_part(part)

    # Chat input for the user
    user_input = st.chat_input("What would you like to know about this repository?")

    if user_input:
        # Must have a GitHub URL
        if not github_url:
            st.error("Please enter a GitHub repository URL first.")
            return

        # We append a new request to the conversation explicitly
        st.session_state.messages.append(
            ModelRequest(parts=[UserPromptPart(content=user_input)])
        )
        
        # Display user prompt in the UI
        with st.chat_message("user"):
            st.markdown(user_input)

        # Display the assistant's partial response while streaming
        with st.chat_message("assistant"):
            # Actually run the agent now, streaming the text
            await run_agent_with_streaming(user_input, github_url=github_url)


if __name__ == "__main__":
    asyncio.run(main())

3 Likes

Awesome! Thank you so much Cole. I’ll give it a shot. What version of pydantic does this use (I think currently most updated is 0.17)?

1 Like

You are so welcome!

This is using the most recent version! 0.0.17.

1 Like

Just got it up and running! This is absolutely perfect. Thank you again Cole!

2 Likes

Awesome - you are so welcome! :smiley:

Hello @ColeMedin I tried your code, but I’m getting the following error

RuntimeError: Event loop is closed
Traceback:
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/streamlit/runtime/scriptrunner/exec_code.py", line 88, in exec_func_with_error_handling
    result = func()
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/streamlit/runtime/scriptrunner/script_runner.py", line 579, in code_to_exec
    exec(code, module.__dict__)
    ~~~~^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/cole.py", line 127, in <module>
    asyncio.run(main())
    ~~~~~~~~~~~^^^^^^^^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/runners.py", line 194, in run
    return runner.run(main)
           ~~~~~~~~~~^^^^^^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/base_events.py", line 720, in run_until_complete
    return future.result()
           ~~~~~~~~~~~~~^^
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/cole.py", line 123, in main
    await run_agent_with_streaming(user_input)
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/cole.py", line 66, in run_agent_with_streaming
    async with agent.run_stream(
               ~~~~~~~~~~~~~~~~^
        user_input,
        ^^^^^^^^^^^
        # pass entire conversation so far
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
        message_history=st.session_state.messages[:-1],
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    ) as result:
    ^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/contextlib.py", line 214, in __aenter__
    return await anext(self.gen)
           ^^^^^^^^^^^^^^^^^^^^^
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/pydantic_ai/agent.py", line 421, in run_stream
    async with agent_model.request_stream(messages, model_settings) as model_response:
               ~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/contextlib.py", line 214, in __aenter__
    return await anext(self.gen)
           ^^^^^^^^^^^^^^^^^^^^^
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/pydantic_ai/models/gemini.py", line 183, in request_stream
    async with self._make_request(messages, True, model_settings) as http_response:
               ~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/contextlib.py", line 214, in __aenter__
    return await anext(self.gen)
           ^^^^^^^^^^^^^^^^^^^^^
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/pydantic_ai/models/gemini.py", line 221, in _make_request
    async with self.http_client.stream(
               ~~~~~~~~~~~~~~~~~~~~~~~^
        'POST',
        ^^^^^^^
    ...<3 lines>...
        timeout=(model_settings or {}).get('timeout', USE_CLIENT_DEFAULT),
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    ) as r:
    ^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/contextlib.py", line 214, in __aenter__
    return await anext(self.gen)
           ^^^^^^^^^^^^^^^^^^^^^
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/httpx/_client.py", line 1628, in stream
    response = await self.send(
               ^^^^^^^^^^^^^^^^
    ...<4 lines>...
    )
    ^
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/httpx/_client.py", line 1674, in send
    response = await self._send_handling_auth(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    ...<4 lines>...
    )
    ^
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/httpx/_client.py", line 1702, in _send_handling_auth
    response = await self._send_handling_redirects(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    ...<3 lines>...
    )
    ^
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/httpx/_client.py", line 1739, in _send_handling_redirects
    response = await self._send_single_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/httpx/_client.py", line 1776, in _send_single_request
    response = await transport.handle_async_request(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/httpx/_transports/default.py", line 377, in handle_async_request
    resp = await self._pool.handle_async_request(req)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/httpcore/_async/connection_pool.py", line 256, in handle_async_request
    raise exc from None
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/httpcore/_async/connection_pool.py", line 229, in handle_async_request
    await self._close_connections(closing)
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/httpcore/_async/connection_pool.py", line 345, in _close_connections
    await connection.aclose()
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/httpcore/_async/connection.py", line 173, in aclose
    await self._connection.aclose()
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/httpcore/_async/http11.py", line 258, in aclose
    await self._network_stream.aclose()
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/httpcore/_backends/anyio.py", line 53, in aclose
    await self._stream.aclose()
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/anyio/streams/tls.py", line 201, in aclose
    await self.transport_stream.aclose()
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/anyio/_backends/_asyncio.py", line 1306, in aclose
    self._transport.close()
    ~~~~~~~~~~~~~~~~~~~~~^^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/selector_events.py", line 1202, in close
    super().close()
    ~~~~~~~~~~~~~^^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/selector_events.py", line 865, in close
    self._loop.call_soon(self._call_connection_lost, None)
    ~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/base_events.py", line 828, in call_soon
    self._check_closed()
    ~~~~~~~~~~~~~~~~~~^^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/base_events.py", line 551, in _check_closed
    raise RuntimeError('Event loop is closed')

and following is my modified code

from __future__ import annotations
from typing import Literal, TypedDict
import asyncio
import os

import streamlit as st
import httpx

# Import all the message part classes
from pydantic_ai.messages import (
    ModelMessage,
    ModelRequest,
    ModelResponse,
    SystemPromptPart,
    UserPromptPart,
    TextPart,
    ToolCallPart,
    ToolReturnPart,
    RetryPromptPart,
    ModelMessagesTypeAdapter
)
from pydantic_ai.agent import Agent

agent = Agent(
    'gemini-1.5-flash',
    system_prompt='Be concise, reply with one sentence.',
)


class ChatMessage(TypedDict):
    """Format of messages sent to the browser/API."""

    role: Literal['user', 'model']
    timestamp: str
    content: str


def display_message_part(part):
    """
    Display a single part of a message in the Streamlit UI.
    Customize how you display system prompts, user prompts,
    tool calls, tool returns, etc.
    """
    # system-prompt
    if part.part_kind == 'system-prompt':
        with st.chat_message("system"):
            st.markdown(f"**System**: {part.content}")
    # user-prompt
    elif part.part_kind == 'user-prompt':
        with st.chat_message("user"):
            st.markdown(part.content)
    # text
    elif part.part_kind == 'text':
        with st.chat_message("assistant"):
            st.markdown(part.content)


async def run_agent_with_streaming(user_input: str):
    """
    Run the agent with streaming text for the user_input prompt,
    while maintaining the entire conversation in `st.session_state.messages`.
    """

    # Run the agent in a stream

    async with agent.run_stream(
        user_input,
        # pass entire conversation so far
        message_history=st.session_state.messages[:-1],
    ) as result:
        # We'll gather partial text to show incrementally
        partial_text = ""
        message_placeholder = st.empty()
        # Render partial text as it arrives
        async for chunk in result.stream_text(delta=True):
            partial_text += chunk
            message_placeholder.markdown(partial_text)
        # Now that the stream is finished, we have a final result.
        # Add new messages from this run, excluding user-prompt messages
        filtered_messages = [msg for msg in result.new_messages()
                             if not (hasattr(msg, 'parts') and
                                     any(part.part_kind == 'user-prompt' for part in msg.parts))]
        st.session_state.messages.extend(filtered_messages)
        # Add the final response to the messages
        st.session_state.messages.append(
            ModelResponse(parts=[TextPart(content=partial_text)])
        )


async def main():
    st.title("Pydantic AI Agents")

    # Initialize chat history in session state if not present
    if "messages" not in st.session_state:
        st.session_state.messages = []

    # Display all messages from the conversation so far
    # Each message is either a ModelRequest or ModelResponse.
    # We iterate over their parts to decide how to display them.
    for msg in st.session_state.messages:
        if isinstance(msg, ModelRequest) or isinstance(msg, ModelResponse):
            for part in msg.parts:
                display_message_part(part)

    # Chat input for the user
    user_input = st.chat_input(
        "What would you like to know about this repository?")

    if user_input:

        # We append a new request to the conversation explicitly
        st.session_state.messages.append(
            ModelRequest(parts=[UserPromptPart(content=user_input)])
        )

        # Display user prompt in the UI
        with st.chat_message("user"):
            st.markdown(user_input)

        # Display the assistant's partial response while streaming
        with st.chat_message("assistant"):
            # Actually run the agent now, streaming the text
            await run_agent_with_streaming(user_input)


if __name__ == "__main__":
    asyncio.run(main())

Hmmm it looks like you are missing some things like the deps parameter in the run_stream call. Could you double check and make sure you are including that?

I’m not using the deps. Its a plain agent without any deps and tools.

1 Like

Gotcha! I would try with a different LLM then! Could you try with Gemini 2.0 Flash instead of 1.5?