I was going to try to test my agent with a streamlit interface but pydantic ai is oddly difficult to work with in terms of responses. Has anyone found a good reference?
@kai-feinberg I actually have the PERFECT thing for you! I recently created a Streamlit interface to work with the GitHub agent I’ve been building on my channel. I’ll paste the code in here. It takes care of a ton including:
- Managing conversation history in the Streamlit state
- Converting the Pydantic AI message format into what gets displayed in Streamlit
- Streaming the response to the frontend instead of just dumping the final response all at once
- Storing tool calls and responses in the conversation history
You’ll just have to change the GitHub agent import and usage to your agent!
Code:
from __future__ import annotations
from typing import Literal, TypedDict
import asyncio
import os
import streamlit as st
import httpx
import json
import logfire
# Import all the message part classes
from pydantic_ai.messages import (
ModelMessage,
ModelRequest,
ModelResponse,
SystemPromptPart,
UserPromptPart,
TextPart,
ToolCallPart,
ToolReturnPart,
RetryPromptPart,
ModelMessagesTypeAdapter
)
from github_agent_ai import github_agent, Deps
# Load environment variables if needed
from dotenv import load_dotenv
load_dotenv()
# Configure logfire to suppress warnings (optional)
logfire.configure(send_to_logfire='never')
class ChatMessage(TypedDict):
"""Format of messages sent to the browser/API."""
role: Literal['user', 'model']
timestamp: str
content: str
def display_message_part(part):
"""
Display a single part of a message in the Streamlit UI.
Customize how you display system prompts, user prompts,
tool calls, tool returns, etc.
"""
# system-prompt
if part.part_kind == 'system-prompt':
with st.chat_message("system"):
st.markdown(f"**System**: {part.content}")
# user-prompt
elif part.part_kind == 'user-prompt':
with st.chat_message("user"):
st.markdown(part.content)
# text
elif part.part_kind == 'text':
with st.chat_message("assistant"):
st.markdown(part.content)
async def run_agent_with_streaming(user_input: str, github_url: str | None = None):
"""
Run the agent with streaming text for the user_input prompt,
while maintaining the entire conversation in `st.session_state.messages`.
"""
# Prepare dependencies
openai_api_key = os.getenv('OPENAI_API_KEY')
github_token = os.getenv('GITHUB_TOKEN')
deps = Deps(
client=httpx.AsyncClient(),
openai_api_key=openai_api_key,
github_token=github_token,
)
# If you want to prepend the GitHub URL to the user's prompt:
if github_url:
user_input = f"For the GitHub repository {github_url}, {user_input}"
# Run the agent in a stream
try:
async with github_agent.run_stream(
user_input,
deps=deps,
message_history= st.session_state.messages[:-1], # pass entire conversation so far
) as result:
# We'll gather partial text to show incrementally
partial_text = ""
message_placeholder = st.empty()
# Render partial text as it arrives
async for chunk in result.stream_text(delta=True):
partial_text += chunk
message_placeholder.markdown(partial_text)
# Now that the stream is finished, we have a final result.
# Add new messages from this run, excluding user-prompt messages
filtered_messages = [msg for msg in result.new_messages()
if not (hasattr(msg, 'parts') and
any(part.part_kind == 'user-prompt' for part in msg.parts))]
st.session_state.messages.extend(filtered_messages)
# Add the final response to the messages
st.session_state.messages.append(
ModelResponse(parts=[TextPart(content=partial_text)])
)
finally:
await deps.client.aclose()
async def main():
st.title("GitHub Repository Analyzer")
st.write("Ask questions about any GitHub repository!")
# Let the user provide a GitHub URL
github_url = st.text_input("Enter GitHub Repository URL:", key="github_url")
# Initialize chat history in session state if not present
if "messages" not in st.session_state:
st.session_state.messages = []
# Display all messages from the conversation so far
# Each message is either a ModelRequest or ModelResponse.
# We iterate over their parts to decide how to display them.
for msg in st.session_state.messages:
if isinstance(msg, ModelRequest) or isinstance(msg, ModelResponse):
for part in msg.parts:
display_message_part(part)
# Chat input for the user
user_input = st.chat_input("What would you like to know about this repository?")
if user_input:
# Must have a GitHub URL
if not github_url:
st.error("Please enter a GitHub repository URL first.")
return
# We append a new request to the conversation explicitly
st.session_state.messages.append(
ModelRequest(parts=[UserPromptPart(content=user_input)])
)
# Display user prompt in the UI
with st.chat_message("user"):
st.markdown(user_input)
# Display the assistant's partial response while streaming
with st.chat_message("assistant"):
# Actually run the agent now, streaming the text
await run_agent_with_streaming(user_input, github_url=github_url)
if __name__ == "__main__":
asyncio.run(main())
Awesome! Thank you so much Cole. I’ll give it a shot. What version of pydantic does this use (I think currently most updated is 0.17)?
You are so welcome!
This is using the most recent version! 0.0.17.
Just got it up and running! This is absolutely perfect. Thank you again Cole!
Awesome - you are so welcome!
Hello @ColeMedin I tried your code, but I’m getting the following error
RuntimeError: Event loop is closed
Traceback:
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/streamlit/runtime/scriptrunner/exec_code.py", line 88, in exec_func_with_error_handling
result = func()
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/streamlit/runtime/scriptrunner/script_runner.py", line 579, in code_to_exec
exec(code, module.__dict__)
~~~~^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/cole.py", line 127, in <module>
asyncio.run(main())
~~~~~~~~~~~^^^^^^^^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/runners.py", line 194, in run
return runner.run(main)
~~~~~~~~~~^^^^^^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/runners.py", line 118, in run
return self._loop.run_until_complete(task)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/base_events.py", line 720, in run_until_complete
return future.result()
~~~~~~~~~~~~~^^
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/cole.py", line 123, in main
await run_agent_with_streaming(user_input)
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/cole.py", line 66, in run_agent_with_streaming
async with agent.run_stream(
~~~~~~~~~~~~~~~~^
user_input,
^^^^^^^^^^^
# pass entire conversation so far
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
message_history=st.session_state.messages[:-1],
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
) as result:
^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/contextlib.py", line 214, in __aenter__
return await anext(self.gen)
^^^^^^^^^^^^^^^^^^^^^
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/pydantic_ai/agent.py", line 421, in run_stream
async with agent_model.request_stream(messages, model_settings) as model_response:
~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/contextlib.py", line 214, in __aenter__
return await anext(self.gen)
^^^^^^^^^^^^^^^^^^^^^
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/pydantic_ai/models/gemini.py", line 183, in request_stream
async with self._make_request(messages, True, model_settings) as http_response:
~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/contextlib.py", line 214, in __aenter__
return await anext(self.gen)
^^^^^^^^^^^^^^^^^^^^^
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/pydantic_ai/models/gemini.py", line 221, in _make_request
async with self.http_client.stream(
~~~~~~~~~~~~~~~~~~~~~~~^
'POST',
^^^^^^^
...<3 lines>...
timeout=(model_settings or {}).get('timeout', USE_CLIENT_DEFAULT),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
) as r:
^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/contextlib.py", line 214, in __aenter__
return await anext(self.gen)
^^^^^^^^^^^^^^^^^^^^^
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/httpx/_client.py", line 1628, in stream
response = await self.send(
^^^^^^^^^^^^^^^^
...<4 lines>...
)
^
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/httpx/_client.py", line 1674, in send
response = await self._send_handling_auth(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
...<4 lines>...
)
^
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/httpx/_client.py", line 1702, in _send_handling_auth
response = await self._send_handling_redirects(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
...<3 lines>...
)
^
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/httpx/_client.py", line 1739, in _send_handling_redirects
response = await self._send_single_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/httpx/_client.py", line 1776, in _send_single_request
response = await transport.handle_async_request(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/httpx/_transports/default.py", line 377, in handle_async_request
resp = await self._pool.handle_async_request(req)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/httpcore/_async/connection_pool.py", line 256, in handle_async_request
raise exc from None
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/httpcore/_async/connection_pool.py", line 229, in handle_async_request
await self._close_connections(closing)
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/httpcore/_async/connection_pool.py", line 345, in _close_connections
await connection.aclose()
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/httpcore/_async/connection.py", line 173, in aclose
await self._connection.aclose()
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/httpcore/_async/http11.py", line 258, in aclose
await self._network_stream.aclose()
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/httpcore/_backends/anyio.py", line 53, in aclose
await self._stream.aclose()
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/anyio/streams/tls.py", line 201, in aclose
await self.transport_stream.aclose()
File "/Users/greathayat/Documents/playgrounds/python-stuff/ai-agents/venv/lib/python3.13/site-packages/anyio/_backends/_asyncio.py", line 1306, in aclose
self._transport.close()
~~~~~~~~~~~~~~~~~~~~~^^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/selector_events.py", line 1202, in close
super().close()
~~~~~~~~~~~~~^^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/selector_events.py", line 865, in close
self._loop.call_soon(self._call_connection_lost, None)
~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/base_events.py", line 828, in call_soon
self._check_closed()
~~~~~~~~~~~~~~~~~~^^
File "/opt/homebrew/Cellar/python@3.13/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/base_events.py", line 551, in _check_closed
raise RuntimeError('Event loop is closed')
and following is my modified code
from __future__ import annotations
from typing import Literal, TypedDict
import asyncio
import os
import streamlit as st
import httpx
# Import all the message part classes
from pydantic_ai.messages import (
ModelMessage,
ModelRequest,
ModelResponse,
SystemPromptPart,
UserPromptPart,
TextPart,
ToolCallPart,
ToolReturnPart,
RetryPromptPart,
ModelMessagesTypeAdapter
)
from pydantic_ai.agent import Agent
agent = Agent(
'gemini-1.5-flash',
system_prompt='Be concise, reply with one sentence.',
)
class ChatMessage(TypedDict):
"""Format of messages sent to the browser/API."""
role: Literal['user', 'model']
timestamp: str
content: str
def display_message_part(part):
"""
Display a single part of a message in the Streamlit UI.
Customize how you display system prompts, user prompts,
tool calls, tool returns, etc.
"""
# system-prompt
if part.part_kind == 'system-prompt':
with st.chat_message("system"):
st.markdown(f"**System**: {part.content}")
# user-prompt
elif part.part_kind == 'user-prompt':
with st.chat_message("user"):
st.markdown(part.content)
# text
elif part.part_kind == 'text':
with st.chat_message("assistant"):
st.markdown(part.content)
async def run_agent_with_streaming(user_input: str):
"""
Run the agent with streaming text for the user_input prompt,
while maintaining the entire conversation in `st.session_state.messages`.
"""
# Run the agent in a stream
async with agent.run_stream(
user_input,
# pass entire conversation so far
message_history=st.session_state.messages[:-1],
) as result:
# We'll gather partial text to show incrementally
partial_text = ""
message_placeholder = st.empty()
# Render partial text as it arrives
async for chunk in result.stream_text(delta=True):
partial_text += chunk
message_placeholder.markdown(partial_text)
# Now that the stream is finished, we have a final result.
# Add new messages from this run, excluding user-prompt messages
filtered_messages = [msg for msg in result.new_messages()
if not (hasattr(msg, 'parts') and
any(part.part_kind == 'user-prompt' for part in msg.parts))]
st.session_state.messages.extend(filtered_messages)
# Add the final response to the messages
st.session_state.messages.append(
ModelResponse(parts=[TextPart(content=partial_text)])
)
async def main():
st.title("Pydantic AI Agents")
# Initialize chat history in session state if not present
if "messages" not in st.session_state:
st.session_state.messages = []
# Display all messages from the conversation so far
# Each message is either a ModelRequest or ModelResponse.
# We iterate over their parts to decide how to display them.
for msg in st.session_state.messages:
if isinstance(msg, ModelRequest) or isinstance(msg, ModelResponse):
for part in msg.parts:
display_message_part(part)
# Chat input for the user
user_input = st.chat_input(
"What would you like to know about this repository?")
if user_input:
# We append a new request to the conversation explicitly
st.session_state.messages.append(
ModelRequest(parts=[UserPromptPart(content=user_input)])
)
# Display user prompt in the UI
with st.chat_message("user"):
st.markdown(user_input)
# Display the assistant's partial response while streaming
with st.chat_message("assistant"):
# Actually run the agent now, streaming the text
await run_agent_with_streaming(user_input)
if __name__ == "__main__":
asyncio.run(main())
Hmmm it looks like you are missing some things like the deps parameter in the run_stream call. Could you double check and make sure you are including that?
I’m not using the deps
. Its a plain agent without any deps and tools.
Gotcha! I would try with a different LLM then! Could you try with Gemini 2.0 Flash instead of 1.5?