I tried using the function in the open web ui to integrate with n8n webhook. I can pass the chat message, but unfortunately, I am not able to pass the file. The file gets embedded and stored in the open web ui after that it doesn’t give a response, nor is the file passed to n8n.
Can someone guide or help with it would be great and helpul.
from typing import Optional, Callable, Awaitable
from pydantic import BaseModel, Field
import os
import time
import requests
from io import BytesIO
import logging
import json
# Setup logging to file
logging.basicConfig(
filename="app.log",
level=logging.DEBUG,
format="%(asctime)s - %(levelname)s - %(message)s",
)
class Pipe:
class Valves(BaseModel):
n8n_url: str = Field(
default="http://localhost:5678/workflow/code_review"
)
n8n_bearer_token: str = Field(default="...")
input_field: str = Field(default="chatInput")
response_field: str = Field(default="output")
emit_interval: float = Field(
default=2.0, description="Interval in seconds between status emissions"
)
enable_status_indicator: bool = Field(
default=True, description="Enable or disable status indicator emissions"
)
upload_url: str = Field(
default="http://localhost:3000/upload",
description="Open Web UI file upload endpoint",
)
def __init__(self):
self.type = "pipe"
self.id = "code_review"
self.name = "code_review"
self.valves = self.Valves()
self.last_emit_time = 0
async def emit_status(
self,
__event_emitter__: Callable[[dict], Awaitable[None]],
level: str,
message: str,
done: bool,
):
current_time = time.time()
if (
__event_emitter__
and self.valves.enable_status_indicator
and (
current_time - self.last_emit_time >= self.valves.emit_interval or done
)
):
await __event_emitter__(
{
"type": "status",
"data": {
"status": "complete" if done else "in_progress",
"level": level,
"description": message,
"done": done,
},
}
)
self.last_emit_time = current_time
async def pipe(
self,
body: dict,
__user__: Optional[dict] = None,
__event_emitter__: Callable[[dict], Awaitable[None]] = None,
__event_call__: Callable[[dict], Awaitable[dict]] = None,
) -> Optional[dict]:
await self.emit_status(
__event_emitter__, "info", "Calling N8N Workflow with Files...", False
)
messages = body.get("messages", [])
uploaded_files = body.get("files", [])
# Debug: Write messages to a text file
try:
with open(r"C:\Users\cbabu\Desktop\app.txt", "a", encoding="utf-8") as file:
file.write(
json.dumps(body) + "\n"
) # Use json.dumps to convert dict to string
except Exception as file_error:
logging.error(f"Failed to write to file: {file_error}")
if messages:
question = messages[-1]["content"]
if "Prompt: " in question:
question = question.split("Prompt: ")[-1]
try:
task_id = body.get("taskId", "unknown")
uploaded_file_info = []
for file in uploaded_files:
filename = file.get("filename", "unknown")
file_content = file.get("content")
mimetype = file.get("mimetype", "application/octet-stream")
if file_content:
# Convert content to file-like object for upload
file_data = BytesIO(file_content)
files = {"file": (filename, file_data, mimetype)}
# Post file to Open Web UI's upload endpoint
upload_response = requests.post(
self.valves.upload_url, files=files
)
if upload_response.status_code == 200:
file_info = upload_response.json()
uploaded_file_info.append(file_info)
logging.info(f"Uploaded file info: {file_info}")
else:
err_msg = f"Failed to upload file {filename}: {upload_response.status_code} - {upload_response.text}"
logging.error(err_msg)
raise Exception(err_msg)
session_id = f"{__user__.get('id', 'unknown')} - {messages[0]['content'].split('Prompt: ')[-1][:100]}"
payload = {
"sessionId": session_id,
"taskId": task_id,
"uploaded_files": json.dumps(uploaded_file_info),
}
payload[self.valves.input_field] = question
logging.info(f"Payload sent to n8n: {payload}")
headers = {
"Authorization": f"Bearer {self.valves.n8n_bearer_token}",
}
# Send POST request to n8n webhook
response = requests.post(
self.valves.n8n_url,
data=payload,
headers=headers,
)
if response.status_code == 200:
n8n_response = response.json().get(self.valves.response_field, "")
messages.append({"role": "assistant", "content": n8n_response})
else:
raise Exception(
f"Error from n8n: {response.status_code} - {response.text}"
)
except Exception as e:
error_message = f"Error during sequence execution: {str(e)}"
logging.error(error_message)
await self.emit_status(__event_emitter__, "error", error_message, True)
return {"error": str(e)}
else:
error_message = "No messages found in the request body"
logging.warning(error_message)
await self.emit_status(__event_emitter__, "error", error_message, True)
messages.append({"role": "assistant", "content": error_message})
await self.emit_status(__event_emitter__, "info", "Complete", True)
return {"messages": messages}