Files
letta-server/memgpt/server/rest_api/agents/message.py
Robin Goetz 39ada91fe7 feat: next iteration of chat ui (#637)
* feat: add loading indicator when creating new agent

* feat: reorder front page to avoid overflow and always show add button

* feat: display function calls

* feat: set up proxy during development & remove explicit inclusion of host/port in backend calls

* fix: introduce api prefix, split up fastapi server to become more modular, use app directly instead of subprocess

the api prefix allows us to create a proxy for frontend development that relays all /api
requests to our fastapi, while serving the development files for other paths.
splitting up the fastapi server will allow us to branch out and divide up the work better
in the future. using the application directly in our cli instead of a subprocess makes
debugging a thing in development and overall this python native way just seems cleaner.
we can discuss if we should keep the api prefix or if we should distinguish between a REST only
mode and one that also serves the static files for the GUI.
This is just my initial take on things

* chore: build latest frontend
2024-01-11 14:47:51 +01:00

63 lines
2.3 KiB
Python

import asyncio
import json
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from starlette.responses import StreamingResponse
from memgpt.server.rest_api.interface import QueuingInterface
from memgpt.server.server import SyncServer
router = APIRouter()
class UserMessage(BaseModel):
user_id: str
agent_id: str
message: str
stream: bool = False
def setup_agents_message_router(server: SyncServer, interface: QueuingInterface):
@router.post("/agents/message")
async def user_message(body: UserMessage):
if body.stream:
# For streaming response
try:
# Start the generation process (similar to the non-streaming case)
# This should be a non-blocking call or run in a background task
# Check if server.user_message is an async function
if asyncio.iscoroutinefunction(server.user_message):
# Start the async task
await asyncio.create_task(server.user_message(user_id=body.user_id, agent_id=body.agent_id, message=body.message))
else:
# Run the synchronous function in a thread pool
loop = asyncio.get_event_loop()
loop.run_in_executor(None, server.user_message, body.user_id, body.agent_id, body.message)
async def formatted_message_generator():
async for message in interface.message_generator():
formatted_message = f"data: {json.dumps(message)}\n\n"
yield formatted_message
await asyncio.sleep(1)
# Return the streaming response using the generator
return StreamingResponse(formatted_message_generator(), media_type="text/event-stream")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"{e}")
else:
interface.clear()
try:
server.user_message(user_id=body.user_id, agent_id=body.agent_id, message=body.message)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"{e}")
return {"messages": interface.to_list()}
return router