Files
letta-server/memgpt/server/rest_api/interface.py
Charles Packer 2048ba179b added memgpt server command (#611)
* added memgpt server command

* added the option to specify a port (rest default 8283, ws default 8282)

* fixed import in test

* added agent saving on shutdown

* added basic locking mechanism (assumes only one server.py is running at the same time)

* remove 'STOP' from buffer when converting to list for the non-streaming POST resposne

* removed duplicate on_event (redundant to lifespan)

* added GET agents/memory route

* added GET agent config

* added GET server config

* added PUT route for modifying agent core memory

* refactored to put server loop in separate function called via main
2023-12-13 00:41:40 -08:00

78 lines
2.3 KiB
Python

import asyncio
import queue
from memgpt.interface import AgentInterface
class QueuingInterface(AgentInterface):
"""Messages are queued inside an internal buffer and manually flushed"""
def __init__(self):
self.buffer = queue.Queue()
def to_list(self):
"""Convert queue to a list (empties it out at the same time)"""
items = []
while not self.buffer.empty():
try:
items.append(self.buffer.get_nowait())
except queue.Empty:
break
if len(items) > 1 and items[-1] == "STOP":
items.pop()
return items
def clear(self):
"""Clear all messages from the queue."""
with self.buffer.mutex:
# Empty the queue
self.buffer.queue.clear()
async def message_generator(self):
while True:
if not self.buffer.empty():
message = self.buffer.get()
if message == "STOP":
break
yield message
else:
await asyncio.sleep(0.1) # Small sleep to prevent a busy loop
def step_yield(self):
"""Enqueue a special stop message"""
self.buffer.put("STOP")
def user_message(self, msg: str):
"""Handle reception of a user message"""
pass
def internal_monologue(self, msg: str) -> None:
"""Handle the agent's internal monologue"""
print(msg)
self.buffer.put({"internal_monologue": msg})
def assistant_message(self, msg: str) -> None:
"""Handle the agent sending a message"""
print(msg)
self.buffer.put({"assistant_message": msg})
def function_message(self, msg: str) -> None:
"""Handle the agent calling a function"""
print(msg)
if msg.startswith("Running "):
msg = msg.replace("Running ", "")
self.buffer.put({"function_call": msg})
elif msg.startswith("Success: "):
msg = msg.replace("Success: ", "")
self.buffer.put({"function_return": msg, "status": "success"})
elif msg.startswith("Error: "):
msg = msg.replace("Error: ", "")
self.buffer.put({"function_return": msg, "status": "error"})
else:
# NOTE: generic, should not happen
self.buffer.put({"function_message": msg})