Issue while passing file via open web ui to n8n webhook

I tried using the function in the open web ui to integrate with n8n webhook. I can pass the chat message, but unfortunately, I am not able to pass the file. The file gets embedded and stored in the open web ui after that it doesn’t give a response, nor is the file passed to n8n.

Can someone guide or help with it would be great and helpul.

from typing import Optional, Callable, Awaitable
from pydantic import BaseModel, Field
import os
import time
import requests
from io import BytesIO
import logging
import json

# Setup logging to file
logging.basicConfig(
    filename="app.log",
    level=logging.DEBUG,
    format="%(asctime)s - %(levelname)s - %(message)s",
)


class Pipe:
    class Valves(BaseModel):
        n8n_url: str = Field(
            default="http://localhost:5678/workflow/code_review"
        )
        n8n_bearer_token: str = Field(default="...")
        input_field: str = Field(default="chatInput")
        response_field: str = Field(default="output")
        emit_interval: float = Field(
            default=2.0, description="Interval in seconds between status emissions"
        )
        enable_status_indicator: bool = Field(
            default=True, description="Enable or disable status indicator emissions"
        )
        upload_url: str = Field(
            default="http://localhost:3000/upload",
            description="Open Web UI file upload endpoint",
        )

    def __init__(self):
        self.type = "pipe"
        self.id = "code_review"
        self.name = "code_review"
        self.valves = self.Valves()
        self.last_emit_time = 0

    async def emit_status(
        self,
        __event_emitter__: Callable[[dict], Awaitable[None]],
        level: str,
        message: str,
        done: bool,
    ):
        current_time = time.time()
        if (
            __event_emitter__
            and self.valves.enable_status_indicator
            and (
                current_time - self.last_emit_time >= self.valves.emit_interval or done
            )
        ):
            await __event_emitter__(
                {
                    "type": "status",
                    "data": {
                        "status": "complete" if done else "in_progress",
                        "level": level,
                        "description": message,
                        "done": done,
                    },
                }
            )
            self.last_emit_time = current_time

    async def pipe(
        self,
        body: dict,
        __user__: Optional[dict] = None,
        __event_emitter__: Callable[[dict], Awaitable[None]] = None,
        __event_call__: Callable[[dict], Awaitable[dict]] = None,
    ) -> Optional[dict]:
        await self.emit_status(
            __event_emitter__, "info", "Calling N8N Workflow with Files...", False
        )

        messages = body.get("messages", [])
        uploaded_files = body.get("files", [])

        # Debug: Write messages to a text file
        try:
            with open(r"C:\Users\cbabu\Desktop\app.txt", "a", encoding="utf-8") as file:
                file.write(
                    json.dumps(body) + "\n"
                )  # Use json.dumps to convert dict to string
        except Exception as file_error:
            logging.error(f"Failed to write to file: {file_error}")

        if messages:
            question = messages[-1]["content"]
            if "Prompt: " in question:
                question = question.split("Prompt: ")[-1]

            try:
                task_id = body.get("taskId", "unknown")
                uploaded_file_info = []

                for file in uploaded_files:
                    filename = file.get("filename", "unknown")
                    file_content = file.get("content")
                    mimetype = file.get("mimetype", "application/octet-stream")

                    if file_content:
                        # Convert content to file-like object for upload
                        file_data = BytesIO(file_content)
                        files = {"file": (filename, file_data, mimetype)}
                        # Post file to Open Web UI's upload endpoint
                        upload_response = requests.post(
                            self.valves.upload_url, files=files
                        )

                        if upload_response.status_code == 200:
                            file_info = upload_response.json()
                            uploaded_file_info.append(file_info)
                            logging.info(f"Uploaded file info: {file_info}")
                        else:
                            err_msg = f"Failed to upload file {filename}: {upload_response.status_code} - {upload_response.text}"
                            logging.error(err_msg)
                            raise Exception(err_msg)

                session_id = f"{__user__.get('id', 'unknown')} - {messages[0]['content'].split('Prompt: ')[-1][:100]}"
                payload = {
                    "sessionId": session_id,
                    "taskId": task_id,
                    "uploaded_files": json.dumps(uploaded_file_info),
                }
                payload[self.valves.input_field] = question

                logging.info(f"Payload sent to n8n: {payload}")

                headers = {
                    "Authorization": f"Bearer {self.valves.n8n_bearer_token}",
                }

                # Send POST request to n8n webhook
                response = requests.post(
                    self.valves.n8n_url,
                    data=payload,
                    headers=headers,
                )

                if response.status_code == 200:
                    n8n_response = response.json().get(self.valves.response_field, "")
                    messages.append({"role": "assistant", "content": n8n_response})
                else:
                    raise Exception(
                        f"Error from n8n: {response.status_code} - {response.text}"
                    )

            except Exception as e:
                error_message = f"Error during sequence execution: {str(e)}"
                logging.error(error_message)
                await self.emit_status(__event_emitter__, "error", error_message, True)
                return {"error": str(e)}
        else:
            error_message = "No messages found in the request body"
            logging.warning(error_message)
            await self.emit_status(__event_emitter__, "error", error_message, True)
            messages.append({"role": "assistant", "content": error_message})

        await self.emit_status(__event_emitter__, "info", "Complete", True)
        return {"messages": messages}

1 Like

At this time the pipeline doesn’t support files unfortunately since from what I can tell n8n doesn’t have the best support for passing images into the workflows. This would be a good future enhancement though!

Is there any other user interface I can leverage in the market?

You can use an AI coding assistant to create a React or Streamlit UI really easily! AnythingLLM is another one to look into.

Thank you this is really helpful.

1 Like

You are so welcome! :smiley: