diff --git a/configs/server_config.yaml b/configs/server_config.yaml index a2b27ae8..8d9c7195 100644 --- a/configs/server_config.yaml +++ b/configs/server_config.yaml @@ -32,5 +32,8 @@ type = postgres path = /root/.memgpt uri = postgresql+pg8000://memgpt:memgpt@pgvector_db:5432/memgpt +[version] +memgpt_version = 0.4.0 + [client] anon_clientid = 00000000-0000-0000-0000-000000000000 diff --git a/locust_test.py b/locust_test.py index e7418940..015ab266 100644 --- a/locust_test.py +++ b/locust_test.py @@ -21,18 +21,20 @@ class MemGPTUser(HttpUser): # Create a user and get the token self.client.headers = {"Authorization": "Bearer password"} user_data = {"name": f"User-{''.join(random.choices(string.ascii_lowercase + string.digits, k=8))}"} - response = self.client.post("/admin/users", json=user_data) + response = self.client.post("/v1/admin/users", json=user_data) response_json = response.json() print(response_json) self.user_id = response_json["id"] # create a token - response = self.client.post("/admin/users/keys", json={"user_id": self.user_id}) + response = self.client.post("/v1/admin/users/keys", json={"user_id": self.user_id}) self.token = response.json()["key"] # reset to use user token as headers self.client.headers = {"Authorization": f"Bearer {self.token}"} + # @task(1) + # def create_agent(self): # generate random name name = "".join(random.choices(string.ascii_lowercase + string.digits, k=8)) request = CreateAgent( @@ -42,7 +44,7 @@ class MemGPTUser(HttpUser): ) # create an agent - with self.client.post("/api/agents", json=request.model_dump(), headers=self.client.headers, catch_response=True) as response: + with self.client.post("/v1/agents", json=request.model_dump(), headers=self.client.headers, catch_response=True) as response: if response.status_code != 200: response.failure(f"Failed to create agent: {response.text}") @@ -57,10 +59,10 @@ class MemGPTUser(HttpUser): request = MemGPTRequest(messages=messages, stream_steps=False, stream_tokens=False, return_message_object=False) with self.client.post( - f"/api/agents/{self.agent_id}/messages", json=request.model_dump(), headers=self.client.headers, catch_response=True + f"/v1/agents/{self.agent_id}/messages", json=request.model_dump(), headers=self.client.headers, catch_response=True ) as response: if response.status_code != 200: - response.failure(f"Failed to send message: {response.text}") + response.failure(f"Failed to send message {response.status_code}: {response.text}") response = MemGPTResponse(**response.json()) print("Response", response.usage) diff --git a/memgpt/agent_store/db.py b/memgpt/agent_store/db.py index 4655c22e..81f655ce 100644 --- a/memgpt/agent_store/db.py +++ b/memgpt/agent_store/db.py @@ -13,13 +13,12 @@ from sqlalchemy import ( TypeDecorator, and_, asc, - create_engine, desc, or_, select, text, ) -from sqlalchemy.orm import declarative_base, mapped_column, sessionmaker +from sqlalchemy.orm import declarative_base, mapped_column from sqlalchemy.orm.session import close_all_sessions from sqlalchemy.sql import func from sqlalchemy_json import MutableJson @@ -36,6 +35,9 @@ from memgpt.schemas.openai.chat_completions import ToolCall from memgpt.schemas.passage import Passage from memgpt.settings import settings +Base = declarative_base() +config = MemGPTConfig() + class CommonVector(TypeDecorator): """Common type for representing vectors in SQLite""" @@ -66,149 +68,116 @@ class CommonVector(TypeDecorator): return np.frombuffer(value, dtype=np.float32) -# Custom serialization / de-serialization for JSON columns +class MessageModel(Base): + """Defines data model for storing Message objects""" + + __tablename__ = "messages" + __table_args__ = {"extend_existing": True} + + # Assuming message_id is the primary key + id = Column(String, primary_key=True) + user_id = Column(String, nullable=False) + agent_id = Column(String, nullable=False) + + # openai info + role = Column(String, nullable=False) + text = Column(String) # optional: can be null if function call + model = Column(String) # optional: can be null if LLM backend doesn't require specifying + name = Column(String) # optional: multi-agent only + + # tool call request info + # if role == "assistant", this MAY be specified + # if role != "assistant", this must be null + # TODO align with OpenAI spec of multiple tool calls + # tool_calls = Column(ToolCallColumn) + tool_calls = Column(ToolCallColumn) + + # tool call response info + # if role == "tool", then this must be specified + # if role != "tool", this must be null + tool_call_id = Column(String) + + # Add a datetime column, with default value as the current time + created_at = Column(DateTime(timezone=True)) + Index("message_idx_user", user_id, agent_id), + + def __repr__(self): + return f"" + + def to_record(self): + # calls = ( + # [ToolCall(id=tool_call["id"], function=ToolCallFunction(**tool_call["function"])) for tool_call in self.tool_calls] + # if self.tool_calls + # else None + # ) + # if calls: + # assert isinstance(calls[0], ToolCall) + if self.tool_calls and len(self.tool_calls) > 0: + assert isinstance(self.tool_calls[0], ToolCall), type(self.tool_calls[0]) + for tool in self.tool_calls: + assert isinstance(tool, ToolCall), type(tool) + return Message( + user_id=self.user_id, + agent_id=self.agent_id, + role=self.role, + name=self.name, + text=self.text, + model=self.model, + # tool_calls=[ToolCall(id=tool_call["id"], function=ToolCallFunction(**tool_call["function"])) for tool_call in self.tool_calls] if self.tool_calls else None, + tool_calls=self.tool_calls, + tool_call_id=self.tool_call_id, + created_at=self.created_at, + id=self.id, + ) -Base = declarative_base() +class PassageModel(Base): + """Defines data model for storing Passages (consisting of text, embedding)""" + __tablename__ = "passages" + __table_args__ = {"extend_existing": True} -def get_db_model( - config: MemGPTConfig, - table_name: str, - table_type: TableType, - user_id: str, - agent_id: Optional[str] = None, - dialect="postgresql", -): - # Define a helper function to create or get the model class - def create_or_get_model(class_name, base_model, table_name): - if class_name in globals(): - return globals()[class_name] - Model = type(class_name, (base_model,), {"__tablename__": table_name, "__table_args__": {"extend_existing": True}}) - globals()[class_name] = Model - return Model + # Assuming passage_id is the primary key + id = Column(String, primary_key=True) + user_id = Column(String, nullable=False) + text = Column(String) + doc_id = Column(String) + agent_id = Column(String) + source_id = Column(String) - if table_type == TableType.ARCHIVAL_MEMORY or table_type == TableType.PASSAGES: - # create schema for archival memory - class PassageModel(Base): - """Defines data model for storing Passages (consisting of text, embedding)""" - - __abstract__ = True # this line is necessary - - # Assuming passage_id is the primary key - id = Column(String, primary_key=True) - user_id = Column(String, nullable=False) - text = Column(String) - doc_id = Column(String) - agent_id = Column(String) - source_id = Column(String) - - # vector storage - if dialect == "sqlite": - embedding = Column(CommonVector) - else: - from pgvector.sqlalchemy import Vector - - embedding = mapped_column(Vector(MAX_EMBEDDING_DIM)) - - embedding_config = Column(EmbeddingConfigColumn) - metadata_ = Column(MutableJson) - - # Add a datetime column, with default value as the current time - created_at = Column(DateTime(timezone=True)) - - Index("passage_idx_user", user_id, agent_id, doc_id), - - def __repr__(self): - return f"" - - def to_record(self): - # calls = ( - # [ToolCall(id=tool_call["id"], function=ToolCallFunction(**tool_call["function"])) for tool_call in self.tool_calls] - # if self.tool_calls - # else None - # ) - # if calls: - # assert isinstance(calls[0], ToolCall) - if self.tool_calls and len(self.tool_calls) > 0: - assert isinstance(self.tool_calls[0], ToolCall), type(self.tool_calls[0]) - for tool in self.tool_calls: - assert isinstance(tool, ToolCall), type(tool) - return Message( - user_id=self.user_id, - agent_id=self.agent_id, - role=self.role, - name=self.name, - text=self.text, - model=self.model, - # tool_calls=[ToolCall(id=tool_call["id"], function=ToolCallFunction(**tool_call["function"])) for tool_call in self.tool_calls] if self.tool_calls else None, - tool_calls=self.tool_calls, - tool_call_id=self.tool_call_id, - created_at=self.created_at, - id=self.id, - ) - - """Create database model for table_name""" - class_name = f"{table_name.capitalize()}Model" + dialect - return create_or_get_model(class_name, MessageModel, table_name) + # vector storage + if settings.memgpt_pg_uri_no_default: + from pgvector.sqlalchemy import Vector + embedding = mapped_column(Vector(MAX_EMBEDDING_DIM)) + elif config.archival_storage_type == "sqlite" or config.archival_storage_type == "chroma": + embedding = Column(CommonVector) else: - raise ValueError(f"Table type {table_type} not implemented") + raise ValueError(f"Unsupported archival_storage_type: {config.archival_storage_type}") + embedding_config = Column(EmbeddingConfigColumn) + metadata_ = Column(MutableJson) + + # Add a datetime column, with default value as the current time + created_at = Column(DateTime(timezone=True)) + + Index("passage_idx_user", user_id, agent_id, doc_id), + + def __repr__(self): + return f" APIKey: diff --git a/memgpt/server/server.py b/memgpt/server/server.py index 3a25de0f..e6747b48 100644 --- a/memgpt/server/server.py +++ b/memgpt/server/server.py @@ -133,6 +133,68 @@ class Server(object): raise NotImplementedError +from sqlalchemy import create_engine +from sqlalchemy.orm import declarative_base, sessionmaker + +from memgpt.agent_store.db import MessageModel, PassageModel +from memgpt.config import MemGPTConfig + +# NOTE: hack to see if single session management works +from memgpt.metadata import ( + AgentModel, + AgentSourceMappingModel, + APIKeyModel, + BlockModel, + JobModel, + SourceModel, + ToolModel, + UserModel, +) +from memgpt.settings import settings + +config = MemGPTConfig.load() + +# determine the storage type +if config.recall_storage_type == "postgres": + engine = create_engine(settings.memgpt_pg_uri) +elif config.recall_storage_type == "sqlite": + engine = create_engine("sqlite:///" + os.path.join(config.recall_storage_path, "sqlite.db")) +else: + raise ValueError(f"Unknown recall_storage_type: {config.recall_storage_type}") + +Base = declarative_base() +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) +Base.metadata.create_all( + engine, + tables=[ + UserModel.__table__, + AgentModel.__table__, + SourceModel.__table__, + AgentSourceMappingModel.__table__, + APIKeyModel.__table__, + BlockModel.__table__, + ToolModel.__table__, + JobModel.__table__, + PassageModel.__table__, + MessageModel.__table__, + ], +) + + +# Dependency +def get_db(): + db = SessionLocal() + try: + yield db + finally: + db.close() + + +from contextlib import contextmanager + +db_context = contextmanager(get_db) + + class SyncServer(Server): """Simple single-threaded / blocking server process"""