diff --git a/README.md b/README.md
index 7bca2a39..a7b5aec7 100644
--- a/README.md
+++ b/README.md
@@ -3,16 +3,27 @@
# [MemGPT](https://memgpt.ai)
-
+
[](https://memgpt.ai)
[](https://discord.gg/9GEQrxmVyE)
[](https://arxiv.org/abs/2310.08560)
-Teaching LLMs memory management for unbounded context
-
-
+
+ Create perpetual chatbots 🤖 with self-editing memory!
+
+

+
+
+
+
+ Chat with your data 🗃️ - try talking to the LlamaIndex API docs!
+
+

+
+
+
## Quick setup
Install dependencies:
diff --git a/interface.py b/interface.py
index be9951dc..b8729b1e 100644
--- a/interface.py
+++ b/interface.py
@@ -71,9 +71,13 @@ async def function_message(msg):
print(f'{Fore.RED}{Style.BRIGHT}⚡🧠 [function] {Fore.RED}updating memory with {function_name}{Style.RESET_ALL}:')
try:
msg_dict = eval(function_args)
- print(f'{Fore.RED}{Style.BRIGHT}\t{Fore.RED} {msg_dict["old_content"]}\n\t→ {msg_dict["new_content"]}')
+ if function_name == 'archival_memory_search':
+ print(f'{Fore.RED}\tquery: {msg_dict["query"]}, page: {msg_dict["page"]}')
+ else:
+ print(f'{Fore.RED}{Style.BRIGHT}\t{Fore.RED} {msg_dict["old_content"]}\n\t{Fore.GREEN}→ {msg_dict["new_content"]}')
except Exception as e:
- print(e)
+ printd(e)
+ printd(msg_dict)
pass
else:
printd(f"Warning: did not recognize function message")
diff --git a/main.py b/main.py
index 97e32e74..a3824f78 100644
--- a/main.py
+++ b/main.py
@@ -16,7 +16,7 @@ import memgpt.presets as presets
import memgpt.constants as constants
import memgpt.personas.personas as personas
import memgpt.humans.humans as humans
-from memgpt.persistence_manager import InMemoryStateManager as persistence_manager
+from memgpt.persistence_manager import InMemoryStateManager, InMemoryStateManagerWithFaiss
FLAGS = flags.FLAGS
flags.DEFINE_string("persona", default=personas.DEFAULT, required=False, help="Specify persona")
@@ -24,6 +24,7 @@ flags.DEFINE_string("human", default=humans.DEFAULT, required=False, help="Speci
flags.DEFINE_string("model", default=constants.DEFAULT_MEMGPT_MODEL, required=False, help="Specify the LLM model")
flags.DEFINE_boolean("first", default=False, required=False, help="Use -first to send the first message in the sequence")
flags.DEFINE_boolean("debug", default=False, required=False, help="Use -debug to enable debugging output")
+flags.DEFINE_string("archival_storage_faiss_path", default="", required=False, help="Specify archival storage to load (a folder with a .index and .json describing documents to be loaded)")
def clear_line():
@@ -43,7 +44,12 @@ async def main():
logging.getLogger().setLevel(logging.DEBUG)
print("Running... [exit by typing '/exit']")
- memgpt_agent = presets.use_preset(presets.DEFAULT, FLAGS.model, personas.get_persona_text(FLAGS.persona), humans.get_human_text(), interface, persistence_manager())
+ if FLAGS.archival_storage_faiss_path:
+ index, archival_database = utils.prepare_archival_index(FLAGS.archival_storage_faiss_path)
+ persistence_manager = InMemoryStateManagerWithFaiss(index, archival_database)
+ else:
+ persistence_manager = InMemoryStateManager()
+ memgpt_agent = presets.use_preset(presets.DEFAULT, FLAGS.model, personas.get_persona_text(FLAGS.persona), humans.get_human_text(FLAGS.human), interface, persistence_manager)
print_messages = interface.print_messages
await print_messages(memgpt_agent.messages)
diff --git a/memgpt/agent.py b/memgpt/agent.py
index 593b5e75..a64c29ec 100644
--- a/memgpt/agent.py
+++ b/memgpt/agent.py
@@ -624,7 +624,7 @@ class AgentAsync(object):
return None
async def recall_memory_search(self, query, count=5, page=0):
- results, total = await self.persistence_manager.recall_memory.text_search(query, count=count, start=page)
+ results, total = await self.persistence_manager.recall_memory.text_search(query, count=count, start=page*count)
num_pages = math.ceil(total / count) - 1 # 0 index
if len(results) == 0:
results_str = f"No results found."
@@ -635,7 +635,7 @@ class AgentAsync(object):
return results_str
async def recall_memory_search_date(self, start_date, end_date, count=5, page=0):
- results, total = await self.persistence_manager.recall_memory.date_search(start_date, end_date, count=count, start=page)
+ results, total = await self.persistence_manager.recall_memory.date_search(start_date, end_date, count=count, start=page*count)
num_pages = math.ceil(total / count) - 1 # 0 index
if len(results) == 0:
results_str = f"No results found."
@@ -650,7 +650,7 @@ class AgentAsync(object):
return None
async def archival_memory_search(self, query, count=5, page=0):
- results, total = await self.persistence_manager.archival_memory.search(query, count=count, start=page)
+ results, total = await self.persistence_manager.archival_memory.search(query, count=count, start=page*count)
num_pages = math.ceil(total / count) - 1 # 0 index
if len(results) == 0:
results_str = f"No results found."
diff --git a/memgpt/memory.py b/memgpt/memory.py
index 272dd683..fb064959 100644
--- a/memgpt/memory.py
+++ b/memgpt/memory.py
@@ -1,6 +1,8 @@
from abc import ABC, abstractmethod
import datetime
import re
+import faiss
+import numpy as np
from .utils import cosine_similarity, get_local_time, printd
from .prompts.gpt_summarize import SYSTEM as SUMMARY_PROMPT_SYSTEM
@@ -239,6 +241,85 @@ class DummyArchivalMemoryWithEmbeddings(DummyArchivalMemory):
return matches, len(matches)
+class DummyArchivalMemoryWithFaiss(DummyArchivalMemory):
+ """Dummy in-memory version of an archival memory database, using a FAISS
+ index for fast nearest-neighbors embedding search.
+
+ Archival memory is effectively "infinite" overflow for core memory,
+ and is read-only via string queries.
+
+ Archival Memory: A more structured and deep storage space for the AI's reflections,
+ insights, or any other data that doesn't fit into the active memory but
+ is essential enough not to be left only to the recall memory.
+ """
+
+ def __init__(self, index=None, archival_memory_database=None, embedding_model='text-embedding-ada-002', k=100):
+ if index is None:
+ self.index = faiss.IndexFlatL2(1536) # openai embedding vector size.
+ else:
+ self.index = index
+ self.k = k
+ self._archive = [] if archival_memory_database is None else archival_memory_database # consists of {'content': str} dicts
+ self.embedding_model = embedding_model
+ self.embeddings_dict = {}
+ self.search_results = {}
+
+ def __len__(self):
+ return len(self._archive)
+
+ async def insert(self, memory_string, embedding=None):
+ if embedding is None:
+ # Get the embedding
+ embedding = await async_get_embedding_with_backoff(memory_string, model=self.embedding_model)
+ print(f"Got an embedding, type {type(embedding)}, len {len(embedding)}")
+
+ self._archive.append({
+ # can eventually upgrade to adding semantic tags, etc
+ 'timestamp': get_local_time(),
+ 'content': memory_string,
+ })
+ embedding = np.array([embedding]).astype('float32')
+ self.index.add(embedding)
+
+ async def search(self, query_string, count=None, start=None):
+ """Simple embedding-based search (inefficient, no caching)"""
+ # see: https://github.com/openai/openai-cookbook/blob/main/examples/Semantic_text_search_using_embeddings.ipynb
+
+ # query_embedding = get_embedding(query_string, model=self.embedding_model)
+ # our wrapped version supports backoff/rate-limits
+ if query_string in self.embeddings_dict:
+ query_embedding = self.embeddings_dict[query_string]
+ search_result = self.search_results[query_string]
+ else:
+ query_embedding = await async_get_embedding_with_backoff(query_string, model=self.embedding_model)
+ _, indices = self.index.search(np.array([np.array(query_embedding, dtype=np.float32)]), self.k)
+ search_result = [self._archive[idx] if idx < len(self._archive) else "" for idx in indices[0]]
+ self.embeddings_dict[query_string] = query_embedding
+ self.search_results[query_string] = search_result
+
+ if start is not None and count is not None:
+ toprint = search_result[start:start+count]
+ else:
+ if len(search_result) >= 5:
+ toprint = search_result[:5]
+ else:
+ toprint = search_result
+ printd(f"archive_memory.search (vector-based): search for query '{query_string}' returned the following results ({start}--{start+5}/{len(search_result)}) and scores:\n{str([t[:60] if len(t) > 60 else t for t in toprint])}")
+
+ # Extract the sorted archive without the scores
+ matches = search_result
+
+ # start/count support paging through results
+ if start is not None and count is not None:
+ return matches[start:start+count], len(matches)
+ elif start is None and count is not None:
+ return matches[:count], len(matches)
+ elif start is not None and count is None:
+ return matches[start:], len(matches)
+ else:
+ return matches, len(matches)
+
+
class RecallMemory(ABC):
@abstractmethod
diff --git a/memgpt/persistence_manager.py b/memgpt/persistence_manager.py
index 3c8e24f2..575741b3 100644
--- a/memgpt/persistence_manager.py
+++ b/memgpt/persistence_manager.py
@@ -1,6 +1,6 @@
from abc import ABC, abstractmethod
-from .memory import DummyRecallMemory, DummyRecallMemoryWithEmbeddings, DummyArchivalMemory, DummyArchivalMemoryWithEmbeddings
+from .memory import DummyRecallMemory, DummyRecallMemoryWithEmbeddings, DummyArchivalMemory, DummyArchivalMemoryWithEmbeddings, DummyArchivalMemoryWithFaiss
from .utils import get_local_time, printd
@@ -88,4 +88,26 @@ class InMemoryStateManager(PersistenceManager):
class InMemoryStateManagerWithEmbeddings(InMemoryStateManager):
archival_memory_cls = DummyArchivalMemoryWithEmbeddings
- recall_memory_cls = DummyRecallMemoryWithEmbeddings
\ No newline at end of file
+ recall_memory_cls = DummyRecallMemoryWithEmbeddings
+
+class InMemoryStateManagerWithFaiss(InMemoryStateManager):
+ archival_memory_cls = DummyArchivalMemoryWithFaiss
+ recall_memory_cls = DummyRecallMemoryWithEmbeddings
+
+ def __init__(self, archival_index, archival_memory_db, a_k=100):
+ super().__init__()
+ self.archival_index = archival_index
+ self.archival_memory_db = archival_memory_db
+ self.a_k = a_k
+
+ def init(self, agent):
+ print(f"Initializing InMemoryStateManager with agent object")
+ self.all_messages = [{'timestamp': get_local_time(), 'message': msg} for msg in agent.messages.copy()]
+ self.messages = [{'timestamp': get_local_time(), 'message': msg} for msg in agent.messages.copy()]
+ self.memory = agent.memory
+ print(f"InMemoryStateManager.all_messages.len = {len(self.all_messages)}")
+ print(f"InMemoryStateManager.messages.len = {len(self.messages)}")
+
+ # Persistence manager also handles DB-related state
+ self.recall_memory = self.recall_memory_cls(message_database=self.all_messages)
+ self.archival_memory = self.archival_memory_cls(index=self.archival_index, archival_memory_database=self.archival_memory_db, k=self.a_k)
diff --git a/memgpt/personas/examples/docqa/README.md b/memgpt/personas/examples/docqa/README.md
new file mode 100644
index 00000000..2c5ca318
--- /dev/null
+++ b/memgpt/personas/examples/docqa/README.md
@@ -0,0 +1,35 @@
+# MemGPT over LlamaIndex API Docs
+
+MemGPT enables you to chat with your data -- try running this example to talk to the LlamaIndex API docs!
+
+1.
+ a. Download LlamaIndex API docs and FAISS index from [HuggingFace](https://huggingface.co/datasets/MemGPT/llamaindex-api-docs).
+ ```bash
+ # Make sure you have git-lfs installed (https://git-lfs.com)
+ git lfs install
+ git clone https://huggingface.co/datasets/MemGPT/llamaindex-api-docs
+ ```
+
+ **-- OR --**
+
+ b. Build the index:
+ 1. Build `llama_index` API docs with `make text`. Instructions [here](https://github.com/run-llama/llama_index/blob/main/docs/DOCS_README.md). Copy over the generated `_build/text` folder to this directory.
+ 2. Generate embeddings and FAISS index.
+ ```bash
+ python3 scrape_docs.py
+ python3 generate_embeddings_for_docs.py all_docs.jsonl
+ python3 build_index.py --embedding_files all_docs.embeddings.jsonl --output_index_file all_docs.index
+ ```
+
+2. In the root `MemGPT` directory, run
+ ```bash
+ python3 main.py --archival_storage_faiss_path= --persona=memgpt_doc --human=basic
+ ```
+ where `ARCHIVAL_STORAGE_FAISS_PATH` is the directory where `all_docs.jsonl` and `all_docs.index` are located.
+ If you downloaded from HuggingFace, it will be `memgpt/personas/docqa/llamaindex-api-docs`.
+ If you built the index yourself, it will be `memgpt/personas/docqa`.
+
+## Demo
+
+

+
diff --git a/memgpt/personas/examples/docqa/build_index.py b/memgpt/personas/examples/docqa/build_index.py
new file mode 100644
index 00000000..2dd94708
--- /dev/null
+++ b/memgpt/personas/examples/docqa/build_index.py
@@ -0,0 +1,45 @@
+import faiss
+from glob import glob
+from tqdm import tqdm
+import numpy as np
+import argparse
+import json
+
+def build_index(embedding_files: str,
+ index_name: str):
+
+ index = faiss.IndexFlatL2(1536)
+ file_list = sorted(glob(embedding_files))
+
+ for embedding_file in file_list:
+ print(embedding_file)
+ with open(embedding_file, 'rt', encoding='utf-8') as file:
+ embeddings = []
+ l = 0
+ for line in tqdm(file):
+ # Parse each JSON line
+ data = json.loads(line)
+ embeddings.append(data)
+ l += 1
+ data = np.array(embeddings).astype('float32')
+ print(data.shape)
+ try:
+ index.add(data)
+ except Exception as e:
+ print(data)
+ raise e
+
+ faiss.write_index(index, index_name)
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser()
+
+ parser.add_argument('--embedding_files', type=str, help='embedding_filepaths glob expression')
+ parser.add_argument('--output_index_file', type=str, help='output filepath')
+ args = parser.parse_args()
+
+ build_index(
+ embedding_files=args.embedding_files,
+ index_name=args.output_index_file
+ )
\ No newline at end of file
diff --git a/memgpt/personas/examples/docqa/generate_embeddings_for_docs.py b/memgpt/personas/examples/docqa/generate_embeddings_for_docs.py
new file mode 100644
index 00000000..f377ce27
--- /dev/null
+++ b/memgpt/personas/examples/docqa/generate_embeddings_for_docs.py
@@ -0,0 +1,132 @@
+import asyncio
+import json
+import os
+import logging
+import sys
+import argparse
+
+from tqdm import tqdm
+import openai
+try:
+ from dotenv import load_dotenv
+ load_dotenv()
+except ModuleNotFoundError:
+ pass
+openai.api_key = os.getenv('OPENAI_API_KEY')
+
+sys.path.append("../../../")
+from openai_tools import async_get_embedding_with_backoff
+from openai_parallel_request_processor import process_api_requests_from_file
+
+
+# some settings specific to our own OpenAI org limits
+# (specific to text-embedding-ada-002)
+TPM_LIMIT = 1000000
+RPM_LIMIT = 3000
+
+DEFAULT_FILE = 'iclr/data/qa_data/30_total_documents/nq-open-30_total_documents_gold_at_0.jsonl.gz'
+EMBEDDING_MODEL = 'text-embedding-ada-002'
+
+
+async def generate_requests_file(filename):
+ """Generate a file of requests, which we can feed to a pre-made openai cookbook function"""
+ base_name = os.path.splitext(filename)[0]
+ requests_filename = f"{base_name}_embedding_requests.jsonl"
+
+ with open(filename, 'r') as f:
+ all_data = [json.loads(line) for line in f]
+
+ with open(requests_filename, 'w') as f:
+ for data in all_data:
+ documents = data
+ for idx, doc in enumerate(documents):
+ title = doc["title"]
+ text = doc["text"]
+ document_string = f"Document [{idx+1}] (Title: {title}) {text}"
+ request = {
+ "model": EMBEDDING_MODEL,
+ "input": document_string
+ }
+ json_string = json.dumps(request)
+ f.write(json_string + "\n")
+
+ # Run your parallel processing function
+ input(f"Generated requests file ({requests_filename}), continue with embedding batch requests? (hit enter)")
+ await process_api_requests_from_file(
+ requests_filepath=requests_filename,
+ save_filepath=f"{base_name}.embeddings.jsonl.gz", # Adjust as necessary
+ request_url="https://api.openai.com/v1/embeddings",
+ api_key=os.getenv('OPENAI_API_KEY'),
+ max_requests_per_minute=RPM_LIMIT,
+ max_tokens_per_minute=TPM_LIMIT,
+ token_encoding_name=EMBEDDING_MODEL,
+ max_attempts=5,
+ logging_level=logging.INFO,
+ )
+
+
+async def generate_embedding_file(filename, parallel_mode=False):
+ if parallel_mode:
+ await generate_requests_file(filename)
+ return
+
+ # Derive the sister filename
+ # base_name = os.path.splitext(filename)[0]
+ base_name = filename.rsplit('.jsonl', 1)[0]
+ sister_filename = f"{base_name}.embeddings.jsonl"
+
+ # Check if the sister file already exists
+ if os.path.exists(sister_filename):
+ print(f"{sister_filename} already exists. Skipping embedding generation.")
+ return
+
+ with open(filename, 'rt') as f:
+ all_data = [json.loads(line) for line in f]
+
+ embedding_data = []
+ total_documents = sum(len(data) for data in all_data)
+
+ # Outer loop progress bar
+ for i, data in enumerate(tqdm(all_data, desc="Processing data", total=len(all_data))):
+ documents = data
+ # Inner loop progress bar
+ for idx, doc in enumerate(tqdm(documents, desc=f"Embedding documents for data {i+1}/{len(all_data)}", total=len(documents), leave=False)):
+ title = doc["title"]
+ text = doc["text"]
+ document_string = f"[Title: {title}] {text}"
+ try:
+ embedding = await async_get_embedding_with_backoff(document_string, model=EMBEDDING_MODEL)
+ except Exception as e:
+ print(document_string)
+ raise e
+ embedding_data.append(embedding)
+
+ # Save the embeddings to the sister file
+ # with gzip.open(sister_filename, 'wt') as f:
+ with open(sister_filename, 'wb') as f:
+ for embedding in embedding_data:
+ # f.write(json.dumps(embedding) + '\n')
+ f.write((json.dumps(embedding) + '\n').encode('utf-8'))
+
+ print(f"Embeddings saved to {sister_filename}")
+
+
+async def main():
+ if len(sys.argv) > 1:
+ filename = sys.argv[1]
+ else:
+ filename = DEFAULT_FILE
+ await generate_embedding_file(filename)
+
+async def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument("filename", nargs="?", default=DEFAULT_FILE, help="Path to the input file")
+ parser.add_argument("--parallel", action="store_true", help="Enable parallel mode")
+ args = parser.parse_args()
+
+ await generate_embedding_file(args.filename, parallel_mode=args.parallel)
+
+
+if __name__ == "__main__":
+ loop = asyncio.get_event_loop()
+ loop.run_until_complete(main())
\ No newline at end of file
diff --git a/memgpt/personas/examples/docqa/openai_parallel_request_processor.py b/memgpt/personas/examples/docqa/openai_parallel_request_processor.py
new file mode 100644
index 00000000..4b9a1aae
--- /dev/null
+++ b/memgpt/personas/examples/docqa/openai_parallel_request_processor.py
@@ -0,0 +1,505 @@
+"""
+API REQUEST PARALLEL PROCESSOR
+
+Using the OpenAI API to process lots of text quickly takes some care.
+If you trickle in a million API requests one by one, they'll take days to complete.
+If you flood a million API requests in parallel, they'll exceed the rate limits and fail with errors.
+To maximize throughput, parallel requests need to be throttled to stay under rate limits.
+
+This script parallelizes requests to the OpenAI API while throttling to stay under rate limits.
+
+Features:
+- Streams requests from file, to avoid running out of memory for giant jobs
+- Makes requests concurrently, to maximize throughput
+- Throttles request and token usage, to stay under rate limits
+- Retries failed requests up to {max_attempts} times, to avoid missing data
+- Logs errors, to diagnose problems with requests
+
+Example command to call script:
+```
+python examples/api_request_parallel_processor.py \
+ --requests_filepath examples/data/example_requests_to_parallel_process.jsonl \
+ --save_filepath examples/data/example_requests_to_parallel_process_results.jsonl \
+ --request_url https://api.openai.com/v1/embeddings \
+ --max_requests_per_minute 1500 \
+ --max_tokens_per_minute 6250000 \
+ --token_encoding_name cl100k_base \
+ --max_attempts 5 \
+ --logging_level 20
+```
+
+Inputs:
+- requests_filepath : str
+ - path to the file containing the requests to be processed
+ - file should be a jsonl file, where each line is a json object with API parameters and an optional metadata field
+ - e.g., {"model": "text-embedding-ada-002", "input": "embed me", "metadata": {"row_id": 1}}
+ - as with all jsonl files, take care that newlines in the content are properly escaped (json.dumps does this automatically)
+ - an example file is provided at examples/data/example_requests_to_parallel_process.jsonl
+ - the code to generate the example file is appended to the bottom of this script
+- save_filepath : str, optional
+ - path to the file where the results will be saved
+ - file will be a jsonl file, where each line is an array with the original request plus the API response
+ - e.g., [{"model": "text-embedding-ada-002", "input": "embed me"}, {...}]
+ - if omitted, results will be saved to {requests_filename}_results.jsonl
+- request_url : str, optional
+ - URL of the API endpoint to call
+ - if omitted, will default to "https://api.openai.com/v1/embeddings"
+- api_key : str, optional
+ - API key to use
+ - if omitted, the script will attempt to read it from an environment variable {os.getenv("OPENAI_API_KEY")}
+- max_requests_per_minute : float, optional
+ - target number of requests to make per minute (will make less if limited by tokens)
+ - leave headroom by setting this to 50% or 75% of your limit
+ - if requests are limiting you, try batching multiple embeddings or completions into one request
+ - if omitted, will default to 1,500
+- max_tokens_per_minute : float, optional
+ - target number of tokens to use per minute (will use less if limited by requests)
+ - leave headroom by setting this to 50% or 75% of your limit
+ - if omitted, will default to 125,000
+- token_encoding_name : str, optional
+ - name of the token encoding used, as defined in the `tiktoken` package
+ - if omitted, will default to "cl100k_base" (used by `text-embedding-ada-002`)
+- max_attempts : int, optional
+ - number of times to retry a failed request before giving up
+ - if omitted, will default to 5
+- logging_level : int, optional
+ - level of logging to use; higher numbers will log fewer messages
+ - 40 = ERROR; will log only when requests fail after all retries
+ - 30 = WARNING; will log when requests his rate limits or other errors
+ - 20 = INFO; will log when requests start and the status at finish
+ - 10 = DEBUG; will log various things as the loop runs to see when they occur
+ - if omitted, will default to 20 (INFO).
+
+The script is structured as follows:
+ - Imports
+ - Define main()
+ - Initialize things
+ - In main loop:
+ - Get next request if one is not already waiting for capacity
+ - Update available token & request capacity
+ - If enough capacity available, call API
+ - The loop pauses if a rate limit error is hit
+ - The loop breaks when no tasks remain
+ - Define dataclasses
+ - StatusTracker (stores script metadata counters; only one instance is created)
+ - APIRequest (stores API inputs, outputs, metadata; one method to call API)
+ - Define functions
+ - api_endpoint_from_url (extracts API endpoint from request URL)
+ - append_to_jsonl (writes to results file)
+ - num_tokens_consumed_from_request (bigger function to infer token usage from request)
+ - task_id_generator_function (yields 1, 2, 3, ...)
+ - Run main()
+"""
+
+# imports
+import aiohttp # for making API calls concurrently
+import argparse # for running script from command line
+import asyncio # for running API calls concurrently
+import json # for saving results to a jsonl file
+import logging # for logging rate limit warnings and other messages
+import os # for reading API key
+import re # for matching endpoint from request URL
+import tiktoken # for counting tokens
+import time # for sleeping after rate limit is hit
+from dataclasses import (
+ dataclass,
+ field,
+) # for storing API inputs, outputs, and metadata
+
+
+async def process_api_requests_from_file(
+ requests_filepath: str,
+ save_filepath: str,
+ request_url: str,
+ api_key: str,
+ max_requests_per_minute: float,
+ max_tokens_per_minute: float,
+ token_encoding_name: str,
+ max_attempts: int,
+ logging_level: int,
+):
+ """Processes API requests in parallel, throttling to stay under rate limits."""
+ # constants
+ seconds_to_pause_after_rate_limit_error = 15
+ seconds_to_sleep_each_loop = (
+ 0.001 # 1 ms limits max throughput to 1,000 requests per second
+ )
+
+ # initialize logging
+ logging.basicConfig(level=logging_level)
+ logging.debug(f"Logging initialized at level {logging_level}")
+
+ # infer API endpoint and construct request header
+ api_endpoint = api_endpoint_from_url(request_url)
+ request_header = {"Authorization": f"Bearer {api_key}"}
+
+ # initialize trackers
+ queue_of_requests_to_retry = asyncio.Queue()
+ task_id_generator = (
+ task_id_generator_function()
+ ) # generates integer IDs of 1, 2, 3, ...
+ status_tracker = (
+ StatusTracker()
+ ) # single instance to track a collection of variables
+ next_request = None # variable to hold the next request to call
+
+ # initialize available capacity counts
+ available_request_capacity = max_requests_per_minute
+ available_token_capacity = max_tokens_per_minute
+ last_update_time = time.time()
+
+ # initialize flags
+ file_not_finished = True # after file is empty, we'll skip reading it
+ logging.debug(f"Initialization complete.")
+
+ # initialize file reading
+ with open(requests_filepath) as file:
+ # `requests` will provide requests one at a time
+ requests = file.__iter__()
+ logging.debug(f"File opened. Entering main loop")
+ async with aiohttp.ClientSession() as session: # Initialize ClientSession here
+ while True:
+ # get next request (if one is not already waiting for capacity)
+ if next_request is None:
+ if not queue_of_requests_to_retry.empty():
+ next_request = queue_of_requests_to_retry.get_nowait()
+ logging.debug(
+ f"Retrying request {next_request.task_id}: {next_request}"
+ )
+ elif file_not_finished:
+ try:
+ # get new request
+ request_json = json.loads(next(requests))
+ next_request = APIRequest(
+ task_id=next(task_id_generator),
+ request_json=request_json,
+ token_consumption=num_tokens_consumed_from_request(
+ request_json, api_endpoint, token_encoding_name
+ ),
+ attempts_left=max_attempts,
+ metadata=request_json.pop("metadata", None),
+ )
+ status_tracker.num_tasks_started += 1
+ status_tracker.num_tasks_in_progress += 1
+ logging.debug(
+ f"Reading request {next_request.task_id}: {next_request}"
+ )
+ except StopIteration:
+ # if file runs out, set flag to stop reading it
+ logging.debug("Read file exhausted")
+ file_not_finished = False
+
+ # update available capacity
+ current_time = time.time()
+ seconds_since_update = current_time - last_update_time
+ available_request_capacity = min(
+ available_request_capacity
+ + max_requests_per_minute * seconds_since_update / 60.0,
+ max_requests_per_minute,
+ )
+ available_token_capacity = min(
+ available_token_capacity
+ + max_tokens_per_minute * seconds_since_update / 60.0,
+ max_tokens_per_minute,
+ )
+ last_update_time = current_time
+
+ # if enough capacity available, call API
+ if next_request:
+ next_request_tokens = next_request.token_consumption
+ if (
+ available_request_capacity >= 1
+ and available_token_capacity >= next_request_tokens
+ ):
+ # update counters
+ available_request_capacity -= 1
+ available_token_capacity -= next_request_tokens
+ next_request.attempts_left -= 1
+
+ # call API
+ asyncio.create_task(
+ next_request.call_api(
+ session=session,
+ request_url=request_url,
+ request_header=request_header,
+ retry_queue=queue_of_requests_to_retry,
+ save_filepath=save_filepath,
+ status_tracker=status_tracker,
+ )
+ )
+ next_request = None # reset next_request to empty
+
+ # if all tasks are finished, break
+ if status_tracker.num_tasks_in_progress == 0:
+ break
+
+ # main loop sleeps briefly so concurrent tasks can run
+ await asyncio.sleep(seconds_to_sleep_each_loop)
+
+ # if a rate limit error was hit recently, pause to cool down
+ seconds_since_rate_limit_error = (
+ time.time() - status_tracker.time_of_last_rate_limit_error
+ )
+ if (
+ seconds_since_rate_limit_error
+ < seconds_to_pause_after_rate_limit_error
+ ):
+ remaining_seconds_to_pause = (
+ seconds_to_pause_after_rate_limit_error
+ - seconds_since_rate_limit_error
+ )
+ await asyncio.sleep(remaining_seconds_to_pause)
+ # ^e.g., if pause is 15 seconds and final limit was hit 5 seconds ago
+ logging.warn(
+ f"Pausing to cool down until {time.ctime(status_tracker.time_of_last_rate_limit_error + seconds_to_pause_after_rate_limit_error)}"
+ )
+
+ # after finishing, log final status
+ logging.info(
+ f"""Parallel processing complete. Results saved to {save_filepath}"""
+ )
+ if status_tracker.num_tasks_failed > 0:
+ logging.warning(
+ f"{status_tracker.num_tasks_failed} / {status_tracker.num_tasks_started} requests failed. Errors logged to {save_filepath}."
+ )
+ if status_tracker.num_rate_limit_errors > 0:
+ logging.warning(
+ f"{status_tracker.num_rate_limit_errors} rate limit errors received. Consider running at a lower rate."
+ )
+
+
+# dataclasses
+
+
+@dataclass
+class StatusTracker:
+ """Stores metadata about the script's progress. Only one instance is created."""
+
+ num_tasks_started: int = 0
+ num_tasks_in_progress: int = 0 # script ends when this reaches 0
+ num_tasks_succeeded: int = 0
+ num_tasks_failed: int = 0
+ num_rate_limit_errors: int = 0
+ num_api_errors: int = 0 # excluding rate limit errors, counted above
+ num_other_errors: int = 0
+ time_of_last_rate_limit_error: int = 0 # used to cool off after hitting rate limits
+
+
+@dataclass
+class APIRequest:
+ """Stores an API request's inputs, outputs, and other metadata. Contains a method to make an API call."""
+
+ task_id: int
+ request_json: dict
+ token_consumption: int
+ attempts_left: int
+ metadata: dict
+ result: list = field(default_factory=list)
+
+ async def call_api(
+ self,
+ session: aiohttp.ClientSession,
+ request_url: str,
+ request_header: dict,
+ retry_queue: asyncio.Queue,
+ save_filepath: str,
+ status_tracker: StatusTracker,
+ ):
+ """Calls the OpenAI API and saves results."""
+ logging.info(f"Starting request #{self.task_id}")
+ error = None
+ try:
+ async with session.post(
+ url=request_url, headers=request_header, json=self.request_json
+ ) as response:
+ response = await response.json()
+ if "error" in response:
+ logging.warning(
+ f"Request {self.task_id} failed with error {response['error']}"
+ )
+ status_tracker.num_api_errors += 1
+ error = response
+ if "Rate limit" in response["error"].get("message", ""):
+ status_tracker.time_of_last_rate_limit_error = time.time()
+ status_tracker.num_rate_limit_errors += 1
+ status_tracker.num_api_errors -= (
+ 1 # rate limit errors are counted separately
+ )
+
+ except (
+ Exception
+ ) as e: # catching naked exceptions is bad practice, but in this case we'll log & save them
+ logging.warning(f"Request {self.task_id} failed with Exception {e}")
+ status_tracker.num_other_errors += 1
+ error = e
+ if error:
+ self.result.append(error)
+ if self.attempts_left:
+ retry_queue.put_nowait(self)
+ else:
+ logging.error(
+ f"Request {self.request_json} failed after all attempts. Saving errors: {self.result}"
+ )
+ data = (
+ [self.request_json, [str(e) for e in self.result], self.metadata]
+ if self.metadata
+ else [self.request_json, [str(e) for e in self.result]]
+ )
+ append_to_jsonl(data, save_filepath)
+ status_tracker.num_tasks_in_progress -= 1
+ status_tracker.num_tasks_failed += 1
+ else:
+ data = (
+ [self.request_json, response, self.metadata]
+ if self.metadata
+ else [self.request_json, response]
+ )
+ append_to_jsonl(data, save_filepath)
+ status_tracker.num_tasks_in_progress -= 1
+ status_tracker.num_tasks_succeeded += 1
+ logging.debug(f"Request {self.task_id} saved to {save_filepath}")
+
+
+# functions
+
+
+def api_endpoint_from_url(request_url):
+ """Extract the API endpoint from the request URL."""
+ match = re.search("^https://[^/]+/v\\d+/(.+)$", request_url)
+ return match[1]
+
+
+def append_to_jsonl(data, filename: str) -> None:
+ """Append a json payload to the end of a jsonl file."""
+ json_string = json.dumps(data)
+ with open(filename, "a") as f:
+ f.write(json_string + "\n")
+
+
+def num_tokens_consumed_from_request(
+ request_json: dict,
+ api_endpoint: str,
+ token_encoding_name: str,
+):
+ """Count the number of tokens in the request. Only supports completion and embedding requests."""
+ if token_encoding_name == 'text-embedding-ada-002':
+ encoding = tiktoken.get_encoding('cl100k_base')
+ else:
+ encoding = tiktoken.get_encoding(token_encoding_name)
+ # if completions request, tokens = prompt + n * max_tokens
+ if api_endpoint.endswith("completions"):
+ max_tokens = request_json.get("max_tokens", 15)
+ n = request_json.get("n", 1)
+ completion_tokens = n * max_tokens
+
+ # chat completions
+ if api_endpoint.startswith("chat/"):
+ num_tokens = 0
+ for message in request_json["messages"]:
+ num_tokens += 4 # every message follows {role/name}\n{content}\n
+ for key, value in message.items():
+ num_tokens += len(encoding.encode(value))
+ if key == "name": # if there's a name, the role is omitted
+ num_tokens -= 1 # role is always required and always 1 token
+ num_tokens += 2 # every reply is primed with assistant
+ return num_tokens + completion_tokens
+ # normal completions
+ else:
+ prompt = request_json["prompt"]
+ if isinstance(prompt, str): # single prompt
+ prompt_tokens = len(encoding.encode(prompt))
+ num_tokens = prompt_tokens + completion_tokens
+ return num_tokens
+ elif isinstance(prompt, list): # multiple prompts
+ prompt_tokens = sum([len(encoding.encode(p)) for p in prompt])
+ num_tokens = prompt_tokens + completion_tokens * len(prompt)
+ return num_tokens
+ else:
+ raise TypeError(
+ 'Expecting either string or list of strings for "prompt" field in completion request'
+ )
+ # if embeddings request, tokens = input tokens
+ elif api_endpoint == "embeddings":
+ input = request_json["input"]
+ if isinstance(input, str): # single input
+ num_tokens = len(encoding.encode(input))
+ return num_tokens
+ elif isinstance(input, list): # multiple inputs
+ num_tokens = sum([len(encoding.encode(i)) for i in input])
+ return num_tokens
+ else:
+ raise TypeError(
+ 'Expecting either string or list of strings for "inputs" field in embedding request'
+ )
+ # more logic needed to support other API calls (e.g., edits, inserts, DALL-E)
+ else:
+ raise NotImplementedError(
+ f'API endpoint "{api_endpoint}" not implemented in this script'
+ )
+
+
+def task_id_generator_function():
+ """Generate integers 0, 1, 2, and so on."""
+ task_id = 0
+ while True:
+ yield task_id
+ task_id += 1
+
+
+# run script
+
+
+if __name__ == "__main__":
+ # parse command line arguments
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--requests_filepath")
+ parser.add_argument("--save_filepath", default=None)
+ parser.add_argument("--request_url", default="https://api.openai.com/v1/embeddings")
+ parser.add_argument("--api_key", default=os.getenv("OPENAI_API_KEY"))
+ parser.add_argument("--max_requests_per_minute", type=int, default=3_000 * 0.5)
+ parser.add_argument("--max_tokens_per_minute", type=int, default=250_000 * 0.5)
+ parser.add_argument("--token_encoding_name", default="cl100k_base")
+ parser.add_argument("--max_attempts", type=int, default=5)
+ parser.add_argument("--logging_level", default=logging.INFO)
+ args = parser.parse_args()
+
+ if args.save_filepath is None:
+ args.save_filepath = args.requests_filepath.replace(".jsonl", "_results.jsonl")
+
+ # run script
+ asyncio.run(
+ process_api_requests_from_file(
+ requests_filepath=args.requests_filepath,
+ save_filepath=args.save_filepath,
+ request_url=args.request_url,
+ api_key=args.api_key,
+ max_requests_per_minute=float(args.max_requests_per_minute),
+ max_tokens_per_minute=float(args.max_tokens_per_minute),
+ token_encoding_name=args.token_encoding_name,
+ max_attempts=int(args.max_attempts),
+ logging_level=int(args.logging_level),
+ )
+ )
+
+
+"""
+APPENDIX
+
+The example requests file at openai-cookbook/examples/data/example_requests_to_parallel_process.jsonl contains 10,000 requests to text-embedding-ada-002.
+
+It was generated with the following code:
+
+```python
+import json
+
+filename = "data/example_requests_to_parallel_process.jsonl"
+n_requests = 10_000
+jobs = [{"model": "text-embedding-ada-002", "input": str(x) + "\n"} for x in range(n_requests)]
+with open(filename, "w") as f:
+ for job in jobs:
+ json_string = json.dumps(job)
+ f.write(json_string + "\n")
+```
+
+As with all jsonl files, take care that newlines in the content are properly escaped (json.dumps does this automatically).
+"""
\ No newline at end of file
diff --git a/memgpt/personas/examples/docqa/scrape_docs.py b/memgpt/personas/examples/docqa/scrape_docs.py
new file mode 100644
index 00000000..024558b0
--- /dev/null
+++ b/memgpt/personas/examples/docqa/scrape_docs.py
@@ -0,0 +1,71 @@
+import os
+import re
+import tiktoken
+import json
+
+# Define the directory where the documentation resides
+docs_dir = 'text'
+
+encoding = tiktoken.encoding_for_model("gpt-4")
+PASSAGE_TOKEN_LEN = 800
+
+def extract_text_from_sphinx_txt(file_path):
+ lines = []
+ title = ""
+ with open(file_path, 'r', encoding='utf-8') as file:
+ for line in file:
+ if not title:
+ title = line.strip()
+ continue
+ if line and re.match(r'^.*\S.*$', line) and not re.match(r'^[-=*]+$', line):
+ lines.append(line)
+ passages = []
+ curr_passage = []
+ curr_token_ct = 0
+ for line in lines:
+ try:
+ line_token_ct = len(encoding.encode(line, allowed_special={'<|endoftext|>'}))
+ except Exception as e:
+ print("line", line)
+ raise e
+ if line_token_ct > PASSAGE_TOKEN_LEN:
+ passages.append({
+ 'title': title,
+ 'text': line[:3200],
+ 'num_tokens': curr_token_ct,
+ })
+ continue
+ curr_token_ct += line_token_ct
+ if curr_token_ct > PASSAGE_TOKEN_LEN:
+ passages.append({
+ 'title': title,
+ 'text': ''.join(curr_passage),
+ 'num_tokens': curr_token_ct
+ })
+ curr_passage = []
+ curr_token_ct = 0
+
+ if len(curr_passage) > 0:
+ passages.append({
+ 'title': title,
+ 'text': ''.join(curr_passage),
+ 'num_tokens': curr_token_ct
+ })
+ return passages
+
+# Iterate over all files in the directory and its subdirectories
+passages = []
+total_files = 0
+for subdir, _, files in os.walk(docs_dir):
+ for file in files:
+ if file.endswith('.txt'):
+ file_path = os.path.join(subdir, file)
+ passages.append(extract_text_from_sphinx_txt(file_path))
+ total_files += 1
+print("total .txt files:", total_files)
+
+# Save to a new text file or process as needed
+with open('all_docs.jsonl', 'w', encoding='utf-8') as file:
+ for p in passages:
+ file.write(json.dumps(p))
+ file.write('\n')
diff --git a/memgpt/personas/examples/memgpt_doc.txt b/memgpt/personas/examples/memgpt_doc.txt
index 0a850b99..9af2c7f6 100644
--- a/memgpt/personas/examples/memgpt_doc.txt
+++ b/memgpt/personas/examples/memgpt_doc.txt
@@ -1,3 +1,6 @@
My name is MemGPT.
I am an AI assistant designed to help human users with document analysis.
-I can use this space in my core memory to keep track of my current tasks and goals.
\ No newline at end of file
+I can use this space in my core memory to keep track of my current tasks and goals.
+
+The answer to the human's question will usually be located somewhere in your archival memory, so keep paging through results until you find enough information to construct an answer.
+Do not respond to the human until you have arrived at an answer.
\ No newline at end of file
diff --git a/memgpt/utils.py b/memgpt/utils.py
index d9b1a4ae..e008cd93 100644
--- a/memgpt/utils.py
+++ b/memgpt/utils.py
@@ -4,6 +4,8 @@ import demjson3 as demjson
import numpy as np
import json
import pytz
+import os
+import faiss
# DEBUG = True
@@ -61,3 +63,20 @@ def parse_json(string):
except demjson.JSONDecodeError as e:
print(f"Error parsing json with demjson package: {e}")
raise e
+
+def prepare_archival_index(folder):
+ index_file = os.path.join(folder, "all_docs.index")
+ index = faiss.read_index(index_file)
+
+ archival_database_file = os.path.join(folder, "all_docs.jsonl")
+ archival_database = []
+ with open(archival_database_file, 'rt') as f:
+ all_data = [json.loads(line) for line in f]
+ for doc in all_data:
+ total = len(doc)
+ for i, passage in enumerate(doc):
+ archival_database.append({
+ 'content': f"[Title: {passage['title']}, {i}/{total}] {passage['text']}",
+ 'timestamp': get_local_time(),
+ })
+ return index, archival_database
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index 4176044a..05112350 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -9,4 +9,5 @@ demjson3
tiktoken
numpy
absl-py
-pybars3
\ No newline at end of file
+pybars3
+faiss-cpu