Merge pull request #123 from cpacker/improve-autogen-examples

Refactored AutoGen integration + added examples folder
This commit is contained in:
Charles Packer
2023-10-25 23:56:29 -07:00
committed by GitHub
3 changed files with 346 additions and 75 deletions

View File

@@ -0,0 +1,70 @@
"""Example of how to add MemGPT into an AutoGen groupchat
Based on the official AutoGen example here: https://github.com/microsoft/autogen/blob/main/notebook/agentchat_groupchat.ipynb
Begin by doing:
pip install "pyautogen[teachable]"
pip install pymemgpt
or
pip install -e . (inside the MemGPT home directory)
"""
import os
import autogen
from memgpt.autogen.memgpt_agent import create_autogen_memgpt_agent
config_list = [
{
"model": "gpt-4",
"api_key": os.getenv("OPENAI_API_KEY"),
},
]
# If USE_MEMGPT is False, then this example will be the same as the official AutoGen repo (https://github.com/microsoft/autogen/blob/main/notebook/agentchat_groupchat.ipynb)
# If USE_MEMGPT is True, then we swap out the "coder" agent with a MemGPT agent
USE_MEMGPT = True
llm_config = {"config_list": config_list, "seed": 42}
# The user agent
user_proxy = autogen.UserProxyAgent(
name="User_proxy",
system_message="A human admin.",
code_execution_config={"last_n_messages": 2, "work_dir": "groupchat"},
human_input_mode="TERMINATE", # needed?
)
# The agent playing the role of the product manager (PM)
pm = autogen.AssistantAgent(
name="Product_manager",
system_message="Creative in software product ideas.",
llm_config=llm_config,
)
if not USE_MEMGPT:
# In the AutoGen example, we create an AssistantAgent to play the role of the coder
coder = autogen.AssistantAgent(
name="Coder",
llm_config=llm_config,
)
else:
# In our example, we swap this AutoGen agent with a MemGPT agent
# This MemGPT agent will have all the benefits of MemGPT, ie persistent memory, etc.
coder = create_autogen_memgpt_agent(
"MemGPT_coder",
persona_description="I am a 10x engineer, trained in Python. I was the first engineer at Uber (which I make sure to tell everyone I work with).",
user_description=f"You are participating in a group chat with a user ({user_proxy.name}) and a product manager ({pm.name}).",
# extra options
# interface_kwargs={"debug": True},
)
# Initialize the group chat between the user and two LLM agents (PM and coder)
groupchat = autogen.GroupChat(agents=[user_proxy, pm, coder], messages=[], max_round=12)
manager = autogen.GroupChatManager(groupchat=groupchat, llm_config=llm_config)
# Begin the group chat with a message from the user
user_proxy.initiate_chat(
manager,
message="I want to design an app to make me one million dollars in one month. Yes, your heard that right.",
)

View File

@@ -2,6 +2,7 @@ import json
import re
from colorama import Fore, Style, init
init(autoreset=True)
@@ -10,7 +11,6 @@ DEBUG = False # only dumps important messages in the terminal
class DummyInterface(object):
def set_message_list(self, message_list):
pass
@@ -42,10 +42,21 @@ class AutoGenInterface(object):
The buffer needs to be wiped before each call to memgpt.agent.step()
"""
def __init__(self, message_list=None, show_user_message=False, fancy=True):
def __init__(
self,
message_list=None,
fancy=False,
show_user_message=False,
show_inner_thoughts=False,
show_function_outputs=False,
debug=False,
):
self.message_list = message_list
self.show_user_message = show_user_message
self.fancy = fancy # set to false to disable colored outputs + emoji prefixes
self.show_user_message = show_user_message
self.show_inner_thoughts = show_inner_thoughts
self.show_function_outputs = show_function_outputs
self.debug = debug
def reset_message_list(self):
"""Clears the buffer. Call before every agent.step() when using MemGPT+AutoGen"""
@@ -53,28 +64,60 @@ class AutoGenInterface(object):
async def internal_monologue(self, msg):
# ANSI escape code for italic is '\x1B[3m'
message = f'\x1B[3m{Fore.LIGHTBLACK_EX}💭 {msg}{Style.RESET_ALL}' if self.fancy else f'[inner thoughts] {msg}'
if self.debug:
print(f"inner thoughts :: {msg}")
if not self.show_inner_thoughts:
return
message = (
f"\x1B[3m{Fore.LIGHTBLACK_EX}💭 {msg}{Style.RESET_ALL}"
if self.fancy
else f"[inner thoughts] {msg}"
)
self.message_list.append(message)
async def assistant_message(self, msg):
message = f'{Fore.YELLOW}{Style.BRIGHT}🤖 {Fore.YELLOW}{msg}{Style.RESET_ALL}' if self.fancy else msg
if self.debug:
print(f"assistant :: {msg}")
message = (
f"{Fore.YELLOW}{Style.BRIGHT}🤖 {Fore.YELLOW}{msg}{Style.RESET_ALL}"
if self.fancy
else msg
)
self.message_list.append(message)
async def memory_message(self, msg):
message = f'{Fore.LIGHTMAGENTA_EX}{Style.BRIGHT}🧠 {Fore.LIGHTMAGENTA_EX}{msg}{Style.RESET_ALL}' if self.fancy else f'[memory] {msg}'
if self.debug:
print(f"memory :: {msg}")
message = (
f"{Fore.LIGHTMAGENTA_EX}{Style.BRIGHT}🧠 {Fore.LIGHTMAGENTA_EX}{msg}{Style.RESET_ALL}"
if self.fancy
else f"[memory] {msg}"
)
self.message_list.append(message)
async def system_message(self, msg):
message = f'{Fore.MAGENTA}{Style.BRIGHT}🖥️ [system] {Fore.MAGENTA}{msg}{Style.RESET_ALL}' if self.fancy else f'[system] {msg}'
if self.debug:
print(f"system :: {msg}")
message = (
f"{Fore.MAGENTA}{Style.BRIGHT}🖥️ [system] {Fore.MAGENTA}{msg}{Style.RESET_ALL}"
if self.fancy
else f"[system] {msg}"
)
self.message_list.append(message)
async def user_message(self, msg, raw=False):
if self.debug:
print(f"user :: {msg}")
if not self.show_user_message:
return
if isinstance(msg, str):
if raw:
message = f'{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg}{Style.RESET_ALL}' if self.fancy else f'[user] {msg}'
message = (
f"{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg}{Style.RESET_ALL}"
if self.fancy
else f"[user] {msg}"
)
self.message_list.append(message)
return
else:
@@ -82,77 +125,137 @@ class AutoGenInterface(object):
msg_json = json.loads(msg)
except:
print(f"Warning: failed to parse user message into json")
message = f'{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg}{Style.RESET_ALL}' if self.fancy else f'[user] {msg}'
message = (
f"{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg}{Style.RESET_ALL}"
if self.fancy
else f"[user] {msg}"
)
self.message_list.append(message)
return
if msg_json['type'] == 'user_message':
msg_json.pop('type')
message = f'{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg_json}{Style.RESET_ALL}' if self.fancy else f'[user] {msg}'
elif msg_json['type'] == 'heartbeat':
if msg_json["type"] == "user_message":
msg_json.pop("type")
message = (
f"{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg_json}{Style.RESET_ALL}"
if self.fancy
else f"[user] {msg}"
)
elif msg_json["type"] == "heartbeat":
if True or DEBUG:
msg_json.pop('type')
message = f'{Fore.GREEN}{Style.BRIGHT}💓 {Fore.GREEN}{msg_json}{Style.RESET_ALL}' if self.fancy else f'[system heartbeat] {msg}'
elif msg_json['type'] == 'system_message':
msg_json.pop('type')
message = f'{Fore.GREEN}{Style.BRIGHT}🖥️ {Fore.GREEN}{msg_json}{Style.RESET_ALL}' if self.fancy else f'[system] {msg}'
msg_json.pop("type")
message = (
f"{Fore.GREEN}{Style.BRIGHT}💓 {Fore.GREEN}{msg_json}{Style.RESET_ALL}"
if self.fancy
else f"[system heartbeat] {msg}"
)
elif msg_json["type"] == "system_message":
msg_json.pop("type")
message = (
f"{Fore.GREEN}{Style.BRIGHT}🖥️ {Fore.GREEN}{msg_json}{Style.RESET_ALL}"
if self.fancy
else f"[system] {msg}"
)
else:
message = f'{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg_json}{Style.RESET_ALL}' if self.fancy else f'[user] {msg}'
message = (
f"{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg_json}{Style.RESET_ALL}"
if self.fancy
else f"[user] {msg}"
)
self.message_list.append(message)
async def function_message(self, msg):
if self.debug:
print(f"function :: {msg}")
if not self.show_function_outputs:
return
if isinstance(msg, dict):
message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}'
message = (
f"{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}"
)
self.message_list.append(message)
return
if msg.startswith('Success: '):
message = f'{Fore.RED}{Style.BRIGHT}⚡🟢 [function] {Fore.RED}{msg}{Style.RESET_ALL}' if self.fancy else f'[function - OK] {msg}'
elif msg.startswith('Error: '):
message = f'{Fore.RED}{Style.BRIGHT}⚡🔴 [function] {Fore.RED}{msg}{Style.RESET_ALL}' if self.fancy else f'[function - error] {msg}'
elif msg.startswith('Running '):
if msg.startswith("Success: "):
message = (
f"{Fore.RED}{Style.BRIGHT}⚡🟢 [function] {Fore.RED}{msg}{Style.RESET_ALL}"
if self.fancy
else f"[function - OK] {msg}"
)
elif msg.startswith("Error: "):
message = (
f"{Fore.RED}{Style.BRIGHT}⚡🔴 [function] {Fore.RED}{msg}{Style.RESET_ALL}"
if self.fancy
else f"[function - error] {msg}"
)
elif msg.startswith("Running "):
if DEBUG:
message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' if self.fancy else f'[function] {msg}'
message = (
f"{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}"
if self.fancy
else f"[function] {msg}"
)
else:
if 'memory' in msg:
match = re.search(r'Running (\w+)\((.*)\)', msg)
if "memory" in msg:
match = re.search(r"Running (\w+)\((.*)\)", msg)
if match:
function_name = match.group(1)
function_args = match.group(2)
message = f'{Fore.RED}{Style.BRIGHT}⚡🧠 [function] {Fore.RED}updating memory with {function_name}{Style.RESET_ALL}:' \
if self.fancy else f'[function] updating memory with {function_name}'
message = (
f"{Fore.RED}{Style.BRIGHT}⚡🧠 [function] {Fore.RED}updating memory with {function_name}{Style.RESET_ALL}:"
if self.fancy
else f"[function] updating memory with {function_name}"
)
try:
msg_dict = eval(function_args)
if function_name == 'archival_memory_search':
message = f'{Fore.RED}\tquery: {msg_dict["query"]}, page: {msg_dict["page"]}' \
if self.fancy else f'[function] query: {msg_dict["query"]}, page: {msg_dict["page"]}'
if function_name == "archival_memory_search":
message = (
f'{Fore.RED}\tquery: {msg_dict["query"]}, page: {msg_dict["page"]}'
if self.fancy
else f'[function] query: {msg_dict["query"]}, page: {msg_dict["page"]}'
)
else:
message = f'{Fore.RED}{Style.BRIGHT}\t{Fore.RED} {msg_dict["old_content"]}\n\t{Fore.GREEN}{msg_dict["new_content"]}' \
if self.fancy else f'[old -> new] {msg_dict["old_content"]} -> {msg_dict["new_content"]}'
message = (
f'{Fore.RED}{Style.BRIGHT}\t{Fore.RED} {msg_dict["old_content"]}\n\t{Fore.GREEN} {msg_dict["new_content"]}'
if self.fancy
else f'[old -> new] {msg_dict["old_content"]} -> {msg_dict["new_content"]}'
)
except Exception as e:
print(e)
message = msg_dict
else:
print(f"Warning: did not recognize function message")
message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' \
if self.fancy else f'[function] {msg}'
elif 'send_message' in msg:
message = (
f"{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}"
if self.fancy
else f"[function] {msg}"
)
elif "send_message" in msg:
# ignore in debug mode
message = None
else:
message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' \
if self.fancy else f'[function] {msg}'
message = (
f"{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}"
if self.fancy
else f"[function] {msg}"
)
else:
try:
msg_dict = json.loads(msg)
if "status" in msg_dict and msg_dict["status"] == "OK":
message = f'{Fore.GREEN}{Style.BRIGHT}⚡ [function] {Fore.GREEN}{msg}{Style.RESET_ALL}' \
if self.fancy else f'[function] {msg}'
message = (
f"{Fore.GREEN}{Style.BRIGHT}⚡ [function] {Fore.GREEN}{msg}{Style.RESET_ALL}"
if self.fancy
else f"[function] {msg}"
)
except Exception:
print(f"Warning: did not recognize function message {type(msg)} {msg}")
message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' \
if self.fancy else f'[function] {msg}'
message = (
f"{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}"
if self.fancy
else f"[function] {msg}"
)
if message: self.message_list.append(message)
if message:
self.message_list.append(message)

View File

@@ -1,12 +1,17 @@
from autogen.agentchat import ConversableAgent, Agent
from ..agent import AgentAsync
from .. import system
from .. import constants
import asyncio
from typing import Callable, Optional, List, Dict, Union, Any, Tuple
from .interface import AutoGenInterface
from ..persistence_manager import InMemoryStateManager
from .. import system
from .. import constants
from .. import presets
from ..personas import personas
from ..humans import humans
def create_memgpt_autogen_agent_from_config(
name: str,
@@ -20,24 +25,93 @@ def create_memgpt_autogen_agent_from_config(
default_auto_reply: Optional[Union[str, Dict, None]] = "",
):
"""
TODO support AutoGen config workflow in a clean way with constructors
TODO support AutoGen config workflow in a clean way with constructors
"""
raise NotImplementedError
class MemGPTAgent(ConversableAgent):
def create_autogen_memgpt_agent(
autogen_name,
preset=presets.DEFAULT,
model=constants.DEFAULT_MEMGPT_MODEL,
persona_description=personas.DEFAULT,
user_description=humans.DEFAULT,
interface=None,
interface_kwargs={},
persistence_manager=None,
persistence_manager_kwargs={},
):
"""
See AutoGenInterface.__init__ for available options you can pass into
`interface_kwargs`. For example, MemGPT's inner monologue and functions are
off by default so that they are not visible to the other agents. You can
turn these on by passing in
```
interface_kwargs={
"debug": True, # to see all MemGPT activity
"show_inner_thoughts: True # to print MemGPT inner thoughts "globally"
# (visible to all AutoGen agents)
}
```
"""
interface = AutoGenInterface(**interface_kwargs) if interface is None else interface
persistence_manager = (
InMemoryStateManager(**persistence_manager_kwargs)
if persistence_manager is None
else persistence_manager
)
memgpt_agent = presets.use_preset(
preset,
model,
persona_description,
user_description,
interface,
persistence_manager,
)
autogen_memgpt_agent = MemGPTAgent(
name=autogen_name,
agent=memgpt_agent,
)
return autogen_memgpt_agent
class MemGPTAgent(ConversableAgent):
def __init__(
self,
name: str,
agent: AgentAsync,
skip_verify=False
skip_verify=False,
concat_other_agent_messages=False,
):
super().__init__(name)
self.agent = agent
self.skip_verify = skip_verify
self.register_reply([Agent, None], MemGPTAgent._a_generate_reply_for_user_message)
self.concat_other_agent_messages = concat_other_agent_messages
self.register_reply(
[Agent, None], MemGPTAgent._a_generate_reply_for_user_message
)
self.register_reply([Agent, None], MemGPTAgent._generate_reply_for_user_message)
self.messages_processed_up_to_idx = 0
def format_other_agent_message(self, msg):
if "name" in msg:
user_message = f"{msg['name']}: {msg['content']}"
else:
user_message = msg["content"]
return user_message
def find_last_user_message(self):
last_user_message = None
for msg in self.agent.messages:
if msg["role"] == "user":
last_user_message = msg["content"]
return last_user_message
def find_new_messages(self, entire_message_list):
"""Extract the subset of messages that's actually new"""
return entire_message_list[self.messages_processed_up_to_idx :]
def _generate_reply_for_user_message(
self,
@@ -45,7 +119,11 @@ class MemGPTAgent(ConversableAgent):
sender: Optional[Agent] = None,
config: Optional[Any] = None,
) -> Tuple[bool, Union[str, Dict, None]]:
return asyncio.run(self._a_generate_reply_for_user_message(messages=messages, sender=sender, config=config))
return asyncio.run(
self._a_generate_reply_for_user_message(
messages=messages, sender=sender, config=config
)
)
async def _a_generate_reply_for_user_message(
self,
@@ -53,41 +131,61 @@ class MemGPTAgent(ConversableAgent):
sender: Optional[Agent] = None,
config: Optional[Any] = None,
) -> Tuple[bool, Union[str, Dict, None]]:
ret = []
# for the interface
self.agent.interface.reset_message_list()
for msg in messages:
user_message = system.package_user_message(msg['content'])
while True:
new_messages, heartbeat_request, function_failed, token_warning = await self.agent.step(user_message, first_message=False, skip_verify=self.skip_verify)
ret.extend(new_messages)
# Skip user inputs if there's a memory warning, function execution failed, or the agent asked for control
if token_warning:
user_message = system.get_token_limit_warning()
elif function_failed:
user_message = system.get_heartbeat(constants.FUNC_FAILED_HEARTBEAT_MESSAGE)
elif heartbeat_request:
user_message = system.get_heartbeat(constants.REQ_HEARTBEAT_MESSAGE)
else:
break
new_messages = self.find_new_messages(messages)
if len(new_messages) > 1:
if self.concat_other_agent_messages:
# Combine all the other messages into one message
user_message = "\n".join(
[self.format_other_agent_message(m) for m in new_messages]
)
else:
# Extend the MemGPT message list with multiple 'user' messages, then push the last one with agent.step()
self.agent.messages.extend(new_messages[:-1])
user_message = new_messages[-1]
else:
user_message = new_messages[0]
# Package the user message
user_message = system.package_user_message(user_message)
# Send a single message into MemGPT
while True:
(
new_messages,
heartbeat_request,
function_failed,
token_warning,
) = await self.agent.step(
user_message, first_message=False, skip_verify=self.skip_verify
)
# Skip user inputs if there's a memory warning, function execution failed, or the agent asked for control
if token_warning:
user_message = system.get_token_limit_warning()
elif function_failed:
user_message = system.get_heartbeat(
constants.FUNC_FAILED_HEARTBEAT_MESSAGE
)
elif heartbeat_request:
user_message = system.get_heartbeat(constants.REQ_HEARTBEAT_MESSAGE)
else:
break
# Pass back to AutoGen the pretty-printed calls MemGPT made to the interface
pretty_ret = MemGPTAgent.pretty_concat(self.agent.interface.message_list)
self.messages_processed_up_to_idx += len(new_messages)
return True, pretty_ret
@staticmethod
def pretty_concat(messages):
"""AutoGen expects a single response, but MemGPT may take many steps.
To accommodate AutoGen, concatenate all of MemGPT's steps into one and return as a single message.
"""
ret = {
'role': 'assistant',
'content': ''
}
ret = {"role": "assistant", "content": ""}
lines = []
for m in messages:
lines.append(f"{m}")
ret['content'] = '\n'.join(lines)
ret["content"] = "\n".join(lines)
return ret