Merge pull request #64 from cpacker/autogen
Add Autogen MemGPT agent Co-authored-By: Charles Packer <contact@charlespacker.com>
This commit is contained in:
0
memgpt/autogen/__init__.py
Normal file
0
memgpt/autogen/__init__.py
Normal file
158
memgpt/autogen/interface.py
Normal file
158
memgpt/autogen/interface.py
Normal file
@@ -0,0 +1,158 @@
|
||||
import json
|
||||
import re
|
||||
|
||||
from colorama import Fore, Style, init
|
||||
init(autoreset=True)
|
||||
|
||||
|
||||
# DEBUG = True # puts full message outputs in the terminal
|
||||
DEBUG = False # only dumps important messages in the terminal
|
||||
|
||||
|
||||
class DummyInterface(object):
|
||||
|
||||
def set_message_list(self, message_list):
|
||||
pass
|
||||
|
||||
async def internal_monologue(self, msg):
|
||||
pass
|
||||
|
||||
async def assistant_message(self, msg):
|
||||
pass
|
||||
|
||||
async def memory_message(self, msg):
|
||||
pass
|
||||
|
||||
async def system_message(self, msg):
|
||||
pass
|
||||
|
||||
async def user_message(self, msg, raw=False):
|
||||
pass
|
||||
|
||||
async def function_message(self, msg):
|
||||
pass
|
||||
|
||||
|
||||
class AutoGenInterface(object):
|
||||
"""AutoGen expects a single action return in its step loop, but MemGPT may take many actions.
|
||||
|
||||
To support AutoGen, we keep a buffer of all the steps that were taken using the interface abstraction,
|
||||
then we concatenate it all and package back as a single 'assistant' ChatCompletion response.
|
||||
|
||||
The buffer needs to be wiped before each call to memgpt.agent.step()
|
||||
"""
|
||||
|
||||
def __init__(self, message_list=None, show_user_message=False, fancy=True):
|
||||
self.message_list = message_list
|
||||
self.show_user_message = show_user_message
|
||||
self.fancy = fancy # set to false to disable colored outputs + emoji prefixes
|
||||
|
||||
def reset_message_list(self):
|
||||
"""Clears the buffer. Call before every agent.step() when using MemGPT+AutoGen"""
|
||||
self.message_list = []
|
||||
|
||||
async def internal_monologue(self, msg):
|
||||
# ANSI escape code for italic is '\x1B[3m'
|
||||
message = f'\x1B[3m{Fore.LIGHTBLACK_EX}💭 {msg}{Style.RESET_ALL}' if self.fancy else f'[inner thoughts] {msg}'
|
||||
self.message_list.append(message)
|
||||
|
||||
async def assistant_message(self, msg):
|
||||
message = f'{Fore.YELLOW}{Style.BRIGHT}🤖 {Fore.YELLOW}{msg}{Style.RESET_ALL}' if self.fancy else msg
|
||||
self.message_list.append(message)
|
||||
|
||||
async def memory_message(self, msg):
|
||||
message = f'{Fore.LIGHTMAGENTA_EX}{Style.BRIGHT}🧠 {Fore.LIGHTMAGENTA_EX}{msg}{Style.RESET_ALL}' if self.fancy else f'[memory] {msg}'
|
||||
self.message_list.append(message)
|
||||
|
||||
async def system_message(self, msg):
|
||||
message = f'{Fore.MAGENTA}{Style.BRIGHT}🖥️ [system] {Fore.MAGENTA}{msg}{Style.RESET_ALL}' if self.fancy else f'[system] {msg}'
|
||||
self.message_list.append(message)
|
||||
|
||||
async def user_message(self, msg, raw=False):
|
||||
if not self.show_user_message:
|
||||
return
|
||||
|
||||
if isinstance(msg, str):
|
||||
if raw:
|
||||
message = f'{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg}{Style.RESET_ALL}' if self.fancy else f'[user] {msg}'
|
||||
self.message_list.append(message)
|
||||
return
|
||||
else:
|
||||
try:
|
||||
msg_json = json.loads(msg)
|
||||
except:
|
||||
print(f"Warning: failed to parse user message into json")
|
||||
message = f'{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg}{Style.RESET_ALL}' if self.fancy else f'[user] {msg}'
|
||||
self.message_list.append(message)
|
||||
return
|
||||
|
||||
if msg_json['type'] == 'user_message':
|
||||
msg_json.pop('type')
|
||||
message = f'{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg_json}{Style.RESET_ALL}' if self.fancy else f'[user] {msg}'
|
||||
elif msg_json['type'] == 'heartbeat':
|
||||
if True or DEBUG:
|
||||
msg_json.pop('type')
|
||||
message = f'{Fore.GREEN}{Style.BRIGHT}💓 {Fore.GREEN}{msg_json}{Style.RESET_ALL}' if self.fancy else f'[system heartbeat] {msg}'
|
||||
elif msg_json['type'] == 'system_message':
|
||||
msg_json.pop('type')
|
||||
message = f'{Fore.GREEN}{Style.BRIGHT}🖥️ {Fore.GREEN}{msg_json}{Style.RESET_ALL}' if self.fancy else f'[system] {msg}'
|
||||
else:
|
||||
message = f'{Fore.GREEN}{Style.BRIGHT}🧑 {Fore.GREEN}{msg_json}{Style.RESET_ALL}' if self.fancy else f'[user] {msg}'
|
||||
|
||||
self.message_list.append(message)
|
||||
|
||||
async def function_message(self, msg):
|
||||
|
||||
if isinstance(msg, dict):
|
||||
message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}'
|
||||
self.message_list.append(message)
|
||||
return
|
||||
|
||||
if msg.startswith('Success: '):
|
||||
message = f'{Fore.RED}{Style.BRIGHT}⚡🟢 [function] {Fore.RED}{msg}{Style.RESET_ALL}' if self.fancy else f'[function - OK] {msg}'
|
||||
elif msg.startswith('Error: '):
|
||||
message = f'{Fore.RED}{Style.BRIGHT}⚡🔴 [function] {Fore.RED}{msg}{Style.RESET_ALL}' if self.fancy else f'[function - error] {msg}'
|
||||
elif msg.startswith('Running '):
|
||||
if DEBUG:
|
||||
message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' if self.fancy else f'[function] {msg}'
|
||||
else:
|
||||
if 'memory' in msg:
|
||||
match = re.search(r'Running (\w+)\((.*)\)', msg)
|
||||
if match:
|
||||
function_name = match.group(1)
|
||||
function_args = match.group(2)
|
||||
message = f'{Fore.RED}{Style.BRIGHT}⚡🧠 [function] {Fore.RED}updating memory with {function_name}{Style.RESET_ALL}:' \
|
||||
if self.fancy else f'[function] updating memory with {function_name}'
|
||||
try:
|
||||
msg_dict = eval(function_args)
|
||||
if function_name == 'archival_memory_search':
|
||||
message = f'{Fore.RED}\tquery: {msg_dict["query"]}, page: {msg_dict["page"]}' \
|
||||
if self.fancy else f'[function] query: {msg_dict["query"]}, page: {msg_dict["page"]}'
|
||||
else:
|
||||
message = f'{Fore.RED}{Style.BRIGHT}\t{Fore.RED} {msg_dict["old_content"]}\n\t{Fore.GREEN}→ {msg_dict["new_content"]}' \
|
||||
if self.fancy else f'[old -> new] {msg_dict["old_content"]} -> {msg_dict["new_content"]}'
|
||||
except Exception as e:
|
||||
print(e)
|
||||
message = msg_dict
|
||||
else:
|
||||
print(f"Warning: did not recognize function message")
|
||||
message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' \
|
||||
if self.fancy else f'[function] {msg}'
|
||||
elif 'send_message' in msg:
|
||||
# ignore in debug mode
|
||||
message = None
|
||||
else:
|
||||
message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' \
|
||||
if self.fancy else f'[function] {msg}'
|
||||
else:
|
||||
try:
|
||||
msg_dict = json.loads(msg)
|
||||
if "status" in msg_dict and msg_dict["status"] == "OK":
|
||||
message = f'{Fore.GREEN}{Style.BRIGHT}⚡ [function] {Fore.GREEN}{msg}{Style.RESET_ALL}' \
|
||||
if self.fancy else f'[function] {msg}'
|
||||
except Exception:
|
||||
print(f"Warning: did not recognize function message {type(msg)} {msg}")
|
||||
message = f'{Fore.RED}{Style.BRIGHT}⚡ [function] {Fore.RED}{msg}{Style.RESET_ALL}' \
|
||||
if self.fancy else f'[function] {msg}'
|
||||
|
||||
if message: self.message_list.append(message)
|
||||
93
memgpt/autogen/memgpt_agent.py
Normal file
93
memgpt/autogen/memgpt_agent.py
Normal file
@@ -0,0 +1,93 @@
|
||||
from autogen.agentchat import ConversableAgent, Agent
|
||||
from ..agent import AgentAsync
|
||||
|
||||
from .. import system
|
||||
from .. import constants
|
||||
|
||||
import asyncio
|
||||
from typing import Callable, Optional, List, Dict, Union, Any, Tuple
|
||||
|
||||
|
||||
def create_memgpt_autogen_agent_from_config(
|
||||
name: str,
|
||||
system_message: Optional[str] = "You are a helpful AI Assistant.",
|
||||
is_termination_msg: Optional[Callable[[Dict], bool]] = None,
|
||||
max_consecutive_auto_reply: Optional[int] = None,
|
||||
human_input_mode: Optional[str] = "TERMINATE",
|
||||
function_map: Optional[Dict[str, Callable]] = None,
|
||||
code_execution_config: Optional[Union[Dict, bool]] = None,
|
||||
llm_config: Optional[Union[Dict, bool]] = None,
|
||||
default_auto_reply: Optional[Union[str, Dict, None]] = "",
|
||||
):
|
||||
"""
|
||||
TODO support AutoGen config workflow in a clean way with constructors
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class MemGPTAgent(ConversableAgent):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
agent: AgentAsync,
|
||||
skip_verify=False
|
||||
):
|
||||
super().__init__(name)
|
||||
self.agent = agent
|
||||
self.skip_verify = skip_verify
|
||||
self.register_reply([Agent, None], MemGPTAgent._a_generate_reply_for_user_message)
|
||||
self.register_reply([Agent, None], MemGPTAgent._generate_reply_for_user_message)
|
||||
|
||||
def _generate_reply_for_user_message(
|
||||
self,
|
||||
messages: Optional[List[Dict]] = None,
|
||||
sender: Optional[Agent] = None,
|
||||
config: Optional[Any] = None,
|
||||
) -> Tuple[bool, Union[str, Dict, None]]:
|
||||
return asyncio.run(self._a_generate_reply_for_user_message(messages=messages, sender=sender, config=config))
|
||||
|
||||
async def _a_generate_reply_for_user_message(
|
||||
self,
|
||||
messages: Optional[List[Dict]] = None,
|
||||
sender: Optional[Agent] = None,
|
||||
config: Optional[Any] = None,
|
||||
) -> Tuple[bool, Union[str, Dict, None]]:
|
||||
ret = []
|
||||
# for the interface
|
||||
self.agent.interface.reset_message_list()
|
||||
|
||||
for msg in messages:
|
||||
user_message = system.package_user_message(msg['content'])
|
||||
while True:
|
||||
new_messages, heartbeat_request, function_failed, token_warning = await self.agent.step(user_message, first_message=False, skip_verify=self.skip_verify)
|
||||
ret.extend(new_messages)
|
||||
# Skip user inputs if there's a memory warning, function execution failed, or the agent asked for control
|
||||
if token_warning:
|
||||
user_message = system.get_token_limit_warning()
|
||||
elif function_failed:
|
||||
user_message = system.get_heartbeat(constants.FUNC_FAILED_HEARTBEAT_MESSAGE)
|
||||
elif heartbeat_request:
|
||||
user_message = system.get_heartbeat(constants.REQ_HEARTBEAT_MESSAGE)
|
||||
else:
|
||||
break
|
||||
|
||||
# Pass back to AutoGen the pretty-printed calls MemGPT made to the interface
|
||||
pretty_ret = MemGPTAgent.pretty_concat(self.agent.interface.message_list)
|
||||
return True, pretty_ret
|
||||
|
||||
@staticmethod
|
||||
def pretty_concat(messages):
|
||||
"""AutoGen expects a single response, but MemGPT may take many steps.
|
||||
|
||||
To accomadate AutoGen, concatenate all of MemGPT's steps into one and return as a single message.
|
||||
"""
|
||||
ret = {
|
||||
'role': 'assistant',
|
||||
'content': ''
|
||||
}
|
||||
lines = []
|
||||
for m in messages:
|
||||
lines.append(f"{m}")
|
||||
ret['content'] = '\n'.join(lines)
|
||||
return ret
|
||||
Reference in New Issue
Block a user