diff --git a/memgpt/autogen/examples/agent_groupchat.py b/memgpt/autogen/examples/agent_groupchat.py new file mode 100644 index 00000000..e4d8224e --- /dev/null +++ b/memgpt/autogen/examples/agent_groupchat.py @@ -0,0 +1,70 @@ +"""Example of how to add MemGPT into an AutoGen groupchat + +Based on the official AutoGen example here: https://github.com/microsoft/autogen/blob/main/notebook/agentchat_groupchat.ipynb + +Begin by doing: + pip install "pyautogen[teachable]" + pip install pymemgpt + or + pip install -e . (inside the MemGPT home directory) +""" + +import os +import autogen +from memgpt.autogen.memgpt_agent import create_autogen_memgpt_agent + +config_list = [ + { + "model": "gpt-4", + "api_key": os.getenv("OPENAI_API_KEY"), + }, +] + +# If USE_MEMGPT is False, then this example will be the same as the official AutoGen repo (https://github.com/microsoft/autogen/blob/main/notebook/agentchat_groupchat.ipynb) +# If USE_MEMGPT is True, then we swap out the "coder" agent with a MemGPT agent +USE_MEMGPT = True + +llm_config = {"config_list": config_list, "seed": 42} + +# The user agent +user_proxy = autogen.UserProxyAgent( + name="User_proxy", + system_message="A human admin.", + code_execution_config={"last_n_messages": 2, "work_dir": "groupchat"}, + human_input_mode="TERMINATE", # needed? +) + +# The agent playing the role of the product manager (PM) +pm = autogen.AssistantAgent( + name="Product_manager", + system_message="Creative in software product ideas.", + llm_config=llm_config, +) + +if not USE_MEMGPT: + # In the AutoGen example, we create an AssistantAgent to play the role of the coder + coder = autogen.AssistantAgent( + name="Coder", + llm_config=llm_config, + ) + +else: + # In our example, we swap this AutoGen agent with a MemGPT agent + # This MemGPT agent will have all the benefits of MemGPT, ie persistent memory, etc. + coder = create_autogen_memgpt_agent( + "MemGPT_coder", + persona_description="I am a 10x engineer, trained in Python. I was the first engineer at Uber (which I make sure to tell everyone I work with).", + user_description=f"You are participating in a group chat with a user ({user_proxy.name}) and a product manager ({pm.name}).", + # extra options + # interface_kwargs={"debug": True}, + ) + +# Initialize the group chat between the user and two LLM agents (PM and coder) +groupchat = autogen.GroupChat(agents=[user_proxy, pm, coder], messages=[], max_round=12) +manager = autogen.GroupChatManager(groupchat=groupchat, llm_config=llm_config) + +# Begin the group chat with a message from the user +user_proxy.initiate_chat( + manager, + message="I want to design an app to make me one million dollars in one month. Yes, your heard that right.", +) diff --git a/memgpt/autogen/interface.py b/memgpt/autogen/interface.py index 15d9cc27..4f01fd7a 100644 --- a/memgpt/autogen/interface.py +++ b/memgpt/autogen/interface.py @@ -2,6 +2,7 @@ import json import re from colorama import Fore, Style, init + init(autoreset=True) @@ -10,7 +11,6 @@ DEBUG = False # only dumps important messages in the terminal class DummyInterface(object): - def set_message_list(self, message_list): pass @@ -42,10 +42,21 @@ class AutoGenInterface(object): The buffer needs to be wiped before each call to memgpt.agent.step() """ - def __init__(self, message_list=None, show_user_message=False, fancy=True): + def __init__( + self, + message_list=None, + fancy=False, + show_user_message=False, + show_inner_thoughts=False, + show_function_outputs=False, + debug=False, + ): self.message_list = message_list - self.show_user_message = show_user_message self.fancy = fancy # set to false to disable colored outputs + emoji prefixes + self.show_user_message = show_user_message + self.show_inner_thoughts = show_inner_thoughts + self.show_function_outputs = show_function_outputs + self.debug = debug def reset_message_list(self): """Clears the buffer. Call before every agent.step() when using MemGPT+AutoGen""" @@ -53,28 +64,60 @@ class AutoGenInterface(object): async def internal_monologue(self, msg): # ANSI escape code for italic is '\x1B[3m' - message = f'\x1B[3m{Fore.LIGHTBLACK_EX}💭 {msg}{Style.RESET_ALL}' if self.fancy else f'[inner thoughts] {msg}' + if self.debug: + print(f"inner thoughts :: {msg}") + if not self.show_inner_thoughts: + return + message = ( + f"\x1B[3m{Fore.LIGHTBLACK_EX}💭 {msg}{Style.RESET_ALL}" + if self.fancy + else f"[inner thoughts] {msg}" + ) self.message_list.append(message) async def assistant_message(self, msg): - message = f'{Fore.YELLOW}{Style.BRIGHT}🤖 {Fore.YELLOW}{msg}{Style.RESET_ALL}' if self.fancy else msg + if self.debug: + print(f"assistant :: {msg}") + message = ( + f"{Fore.YELLOW}{Style.BRIGHT}🤖 {Fore.YELLOW}{msg}{Style.RESET_ALL}" + if self.fancy + else msg + ) self.message_list.append(message) async def memory_message(self, msg): - message = f'{Fore.LIGHTMAGENTA_EX}{Style.BRIGHT}🧠 {Fore.LIGHTMAGENTA_EX}{msg}{Style.RESET_ALL}' if self.fancy else f'[memory] {msg}' + if self.debug: + print(f"memory :: {msg}") + message = ( + f"{Fore.LIGHTMAGENTA_EX}{Style.BRIGHT}🧠 {Fore.LIGHTMAGENTA_EX}{msg}{Style.RESET_ALL}" + if self.fancy + else f"[memory] {msg}" + ) self.message_list.append(message) async def system_message(self, msg): - message = f'{Fore.MAGENTA}{Style.BRIGHT}🖥️ [system] {Fore.MAGENTA}{msg}{Style.RESET_ALL}' if self.fancy else f'[system] {msg}' + if self.debug: + print(f"system :: {msg}") + message = ( + f"{Fore.MAGENTA}{Style.BRIGHT}🖥️ [system] {Fore.MAGENTA}{msg}{Style.RESET_ALL}" + if self.fancy + else f"[system] {msg}" + ) self.message_list.append(message) async def user_message(self, msg, raw=False): + if self.debug: + print(f"user :: {msg}") if not self.show_user_message: return if isinstance(msg, str): if raw: - message = f'{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg}{Style.RESET_ALL}' if self.fancy else f'[user] {msg}' + message = ( + f"{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg}{Style.RESET_ALL}" + if self.fancy + else f"[user] {msg}" + ) self.message_list.append(message) return else: @@ -82,77 +125,137 @@ class AutoGenInterface(object): msg_json = json.loads(msg) except: print(f"Warning: failed to parse user message into json") - message = f'{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg}{Style.RESET_ALL}' if self.fancy else f'[user] {msg}' + message = ( + f"{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg}{Style.RESET_ALL}" + if self.fancy + else f"[user] {msg}" + ) self.message_list.append(message) return - if msg_json['type'] == 'user_message': - msg_json.pop('type') - message = f'{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg_json}{Style.RESET_ALL}' if self.fancy else f'[user] {msg}' - elif msg_json['type'] == 'heartbeat': + if msg_json["type"] == "user_message": + msg_json.pop("type") + message = ( + f"{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg_json}{Style.RESET_ALL}" + if self.fancy + else f"[user] {msg}" + ) + elif msg_json["type"] == "heartbeat": if True or DEBUG: - msg_json.pop('type') - message = f'{Fore.GREEN}{Style.BRIGHT}💓 {Fore.GREEN}{msg_json}{Style.RESET_ALL}' if self.fancy else f'[system heartbeat] {msg}' - elif msg_json['type'] == 'system_message': - msg_json.pop('type') - message = f'{Fore.GREEN}{Style.BRIGHT}🖥️ {Fore.GREEN}{msg_json}{Style.RESET_ALL}' if self.fancy else f'[system] {msg}' + msg_json.pop("type") + message = ( + f"{Fore.GREEN}{Style.BRIGHT}💓 {Fore.GREEN}{msg_json}{Style.RESET_ALL}" + if self.fancy + else f"[system heartbeat] {msg}" + ) + elif msg_json["type"] == "system_message": + msg_json.pop("type") + message = ( + f"{Fore.GREEN}{Style.BRIGHT}🖥️ {Fore.GREEN}{msg_json}{Style.RESET_ALL}" + if self.fancy + else f"[system] {msg}" + ) else: - message = f'{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg_json}{Style.RESET_ALL}' if self.fancy else f'[user] {msg}' + message = ( + f"{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg_json}{Style.RESET_ALL}" + if self.fancy + else f"[user] {msg}" + ) self.message_list.append(message) async def function_message(self, msg): + if self.debug: + print(f"function :: {msg}") + if not self.show_function_outputs: + return if isinstance(msg, dict): - message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' + message = ( + f"{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}" + ) self.message_list.append(message) return - if msg.startswith('Success: '): - message = f'{Fore.RED}{Style.BRIGHT}⚡🟢 [function] {Fore.RED}{msg}{Style.RESET_ALL}' if self.fancy else f'[function - OK] {msg}' - elif msg.startswith('Error: '): - message = f'{Fore.RED}{Style.BRIGHT}⚡🔴 [function] {Fore.RED}{msg}{Style.RESET_ALL}' if self.fancy else f'[function - error] {msg}' - elif msg.startswith('Running '): + if msg.startswith("Success: "): + message = ( + f"{Fore.RED}{Style.BRIGHT}⚡🟢 [function] {Fore.RED}{msg}{Style.RESET_ALL}" + if self.fancy + else f"[function - OK] {msg}" + ) + elif msg.startswith("Error: "): + message = ( + f"{Fore.RED}{Style.BRIGHT}⚡🔴 [function] {Fore.RED}{msg}{Style.RESET_ALL}" + if self.fancy + else f"[function - error] {msg}" + ) + elif msg.startswith("Running "): if DEBUG: - message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' if self.fancy else f'[function] {msg}' + message = ( + f"{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}" + if self.fancy + else f"[function] {msg}" + ) else: - if 'memory' in msg: - match = re.search(r'Running (\w+)\((.*)\)', msg) + if "memory" in msg: + match = re.search(r"Running (\w+)\((.*)\)", msg) if match: function_name = match.group(1) function_args = match.group(2) - message = f'{Fore.RED}{Style.BRIGHT}⚡🧠 [function] {Fore.RED}updating memory with {function_name}{Style.RESET_ALL}:' \ - if self.fancy else f'[function] updating memory with {function_name}' + message = ( + f"{Fore.RED}{Style.BRIGHT}⚡🧠 [function] {Fore.RED}updating memory with {function_name}{Style.RESET_ALL}:" + if self.fancy + else f"[function] updating memory with {function_name}" + ) try: msg_dict = eval(function_args) - if function_name == 'archival_memory_search': - message = f'{Fore.RED}\tquery: {msg_dict["query"]}, page: {msg_dict["page"]}' \ - if self.fancy else f'[function] query: {msg_dict["query"]}, page: {msg_dict["page"]}' + if function_name == "archival_memory_search": + message = ( + f'{Fore.RED}\tquery: {msg_dict["query"]}, page: {msg_dict["page"]}' + if self.fancy + else f'[function] query: {msg_dict["query"]}, page: {msg_dict["page"]}' + ) else: - message = f'{Fore.RED}{Style.BRIGHT}\t{Fore.RED} {msg_dict["old_content"]}\n\t{Fore.GREEN}→ {msg_dict["new_content"]}' \ - if self.fancy else f'[old -> new] {msg_dict["old_content"]} -> {msg_dict["new_content"]}' + message = ( + f'{Fore.RED}{Style.BRIGHT}\t{Fore.RED} {msg_dict["old_content"]}\n\t{Fore.GREEN}→ {msg_dict["new_content"]}' + if self.fancy + else f'[old -> new] {msg_dict["old_content"]} -> {msg_dict["new_content"]}' + ) except Exception as e: print(e) message = msg_dict else: print(f"Warning: did not recognize function message") - message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' \ - if self.fancy else f'[function] {msg}' - elif 'send_message' in msg: + message = ( + f"{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}" + if self.fancy + else f"[function] {msg}" + ) + elif "send_message" in msg: # ignore in debug mode message = None else: - message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' \ - if self.fancy else f'[function] {msg}' + message = ( + f"{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}" + if self.fancy + else f"[function] {msg}" + ) else: try: msg_dict = json.loads(msg) if "status" in msg_dict and msg_dict["status"] == "OK": - message = f'{Fore.GREEN}{Style.BRIGHT}⚡ [function] {Fore.GREEN}{msg}{Style.RESET_ALL}' \ - if self.fancy else f'[function] {msg}' + message = ( + f"{Fore.GREEN}{Style.BRIGHT}⚡ [function] {Fore.GREEN}{msg}{Style.RESET_ALL}" + if self.fancy + else f"[function] {msg}" + ) except Exception: print(f"Warning: did not recognize function message {type(msg)} {msg}") - message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' \ - if self.fancy else f'[function] {msg}' + message = ( + f"{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}" + if self.fancy + else f"[function] {msg}" + ) - if message: self.message_list.append(message) + if message: + self.message_list.append(message) diff --git a/memgpt/autogen/memgpt_agent.py b/memgpt/autogen/memgpt_agent.py index 0c22beb1..91adf5d8 100644 --- a/memgpt/autogen/memgpt_agent.py +++ b/memgpt/autogen/memgpt_agent.py @@ -1,12 +1,17 @@ from autogen.agentchat import ConversableAgent, Agent from ..agent import AgentAsync -from .. import system -from .. import constants - import asyncio from typing import Callable, Optional, List, Dict, Union, Any, Tuple +from .interface import AutoGenInterface +from ..persistence_manager import InMemoryStateManager +from .. import system +from .. import constants +from .. import presets +from ..personas import personas +from ..humans import humans + def create_memgpt_autogen_agent_from_config( name: str, @@ -20,24 +25,93 @@ def create_memgpt_autogen_agent_from_config( default_auto_reply: Optional[Union[str, Dict, None]] = "", ): """ - TODO support AutoGen config workflow in a clean way with constructors + TODO support AutoGen config workflow in a clean way with constructors """ raise NotImplementedError -class MemGPTAgent(ConversableAgent): +def create_autogen_memgpt_agent( + autogen_name, + preset=presets.DEFAULT, + model=constants.DEFAULT_MEMGPT_MODEL, + persona_description=personas.DEFAULT, + user_description=humans.DEFAULT, + interface=None, + interface_kwargs={}, + persistence_manager=None, + persistence_manager_kwargs={}, +): + """ + See AutoGenInterface.__init__ for available options you can pass into + `interface_kwargs`. For example, MemGPT's inner monologue and functions are + off by default so that they are not visible to the other agents. You can + turn these on by passing in + ``` + interface_kwargs={ + "debug": True, # to see all MemGPT activity + "show_inner_thoughts: True # to print MemGPT inner thoughts "globally" + # (visible to all AutoGen agents) + } + ``` + """ + interface = AutoGenInterface(**interface_kwargs) if interface is None else interface + persistence_manager = ( + InMemoryStateManager(**persistence_manager_kwargs) + if persistence_manager is None + else persistence_manager + ) + memgpt_agent = presets.use_preset( + preset, + model, + persona_description, + user_description, + interface, + persistence_manager, + ) + + autogen_memgpt_agent = MemGPTAgent( + name=autogen_name, + agent=memgpt_agent, + ) + return autogen_memgpt_agent + + +class MemGPTAgent(ConversableAgent): def __init__( self, name: str, agent: AgentAsync, - skip_verify=False + skip_verify=False, + concat_other_agent_messages=False, ): super().__init__(name) self.agent = agent self.skip_verify = skip_verify - self.register_reply([Agent, None], MemGPTAgent._a_generate_reply_for_user_message) + self.concat_other_agent_messages = concat_other_agent_messages + self.register_reply( + [Agent, None], MemGPTAgent._a_generate_reply_for_user_message + ) self.register_reply([Agent, None], MemGPTAgent._generate_reply_for_user_message) + self.messages_processed_up_to_idx = 0 + + def format_other_agent_message(self, msg): + if "name" in msg: + user_message = f"{msg['name']}: {msg['content']}" + else: + user_message = msg["content"] + return user_message + + def find_last_user_message(self): + last_user_message = None + for msg in self.agent.messages: + if msg["role"] == "user": + last_user_message = msg["content"] + return last_user_message + + def find_new_messages(self, entire_message_list): + """Extract the subset of messages that's actually new""" + return entire_message_list[self.messages_processed_up_to_idx :] def _generate_reply_for_user_message( self, @@ -45,7 +119,11 @@ class MemGPTAgent(ConversableAgent): sender: Optional[Agent] = None, config: Optional[Any] = None, ) -> Tuple[bool, Union[str, Dict, None]]: - return asyncio.run(self._a_generate_reply_for_user_message(messages=messages, sender=sender, config=config)) + return asyncio.run( + self._a_generate_reply_for_user_message( + messages=messages, sender=sender, config=config + ) + ) async def _a_generate_reply_for_user_message( self, @@ -53,41 +131,61 @@ class MemGPTAgent(ConversableAgent): sender: Optional[Agent] = None, config: Optional[Any] = None, ) -> Tuple[bool, Union[str, Dict, None]]: - ret = [] - # for the interface self.agent.interface.reset_message_list() - for msg in messages: - user_message = system.package_user_message(msg['content']) - while True: - new_messages, heartbeat_request, function_failed, token_warning = await self.agent.step(user_message, first_message=False, skip_verify=self.skip_verify) - ret.extend(new_messages) - # Skip user inputs if there's a memory warning, function execution failed, or the agent asked for control - if token_warning: - user_message = system.get_token_limit_warning() - elif function_failed: - user_message = system.get_heartbeat(constants.FUNC_FAILED_HEARTBEAT_MESSAGE) - elif heartbeat_request: - user_message = system.get_heartbeat(constants.REQ_HEARTBEAT_MESSAGE) - else: - break + new_messages = self.find_new_messages(messages) + if len(new_messages) > 1: + if self.concat_other_agent_messages: + # Combine all the other messages into one message + user_message = "\n".join( + [self.format_other_agent_message(m) for m in new_messages] + ) + else: + # Extend the MemGPT message list with multiple 'user' messages, then push the last one with agent.step() + self.agent.messages.extend(new_messages[:-1]) + user_message = new_messages[-1] + else: + user_message = new_messages[0] + + # Package the user message + user_message = system.package_user_message(user_message) + + # Send a single message into MemGPT + while True: + ( + new_messages, + heartbeat_request, + function_failed, + token_warning, + ) = await self.agent.step( + user_message, first_message=False, skip_verify=self.skip_verify + ) + # Skip user inputs if there's a memory warning, function execution failed, or the agent asked for control + if token_warning: + user_message = system.get_token_limit_warning() + elif function_failed: + user_message = system.get_heartbeat( + constants.FUNC_FAILED_HEARTBEAT_MESSAGE + ) + elif heartbeat_request: + user_message = system.get_heartbeat(constants.REQ_HEARTBEAT_MESSAGE) + else: + break # Pass back to AutoGen the pretty-printed calls MemGPT made to the interface pretty_ret = MemGPTAgent.pretty_concat(self.agent.interface.message_list) + self.messages_processed_up_to_idx += len(new_messages) return True, pretty_ret @staticmethod def pretty_concat(messages): """AutoGen expects a single response, but MemGPT may take many steps. - + To accommodate AutoGen, concatenate all of MemGPT's steps into one and return as a single message. """ - ret = { - 'role': 'assistant', - 'content': '' - } + ret = {"role": "assistant", "content": ""} lines = [] for m in messages: lines.append(f"{m}") - ret['content'] = '\n'.join(lines) + ret["content"] = "\n".join(lines) return ret