diff --git a/letta/llm_api/anthropic.py b/letta/llm_api/anthropic.py index 2c35cfdc..f365a052 100644 --- a/letta/llm_api/anthropic.py +++ b/letta/llm_api/anthropic.py @@ -1,7 +1,8 @@ import json import re import time -from typing import Generator, List, Optional, Tuple, Union +import warnings +from typing import Generator, List, Optional, Union import anthropic from anthropic import PermissionDeniedError @@ -36,7 +37,7 @@ from letta.schemas.openai.chat_completion_response import MessageDelta, ToolCall from letta.services.provider_manager import ProviderManager from letta.settings import model_settings from letta.streaming_interface import AgentChunkStreamingInterface, AgentRefreshStreamingInterface -from letta.utils import get_utc_time, smart_urljoin +from letta.utils import get_utc_time BASE_URL = "https://api.anthropic.com/v1" @@ -567,30 +568,6 @@ def _prepare_anthropic_request( return data -def get_anthropic_endpoint_and_headers( - base_url: str, - api_key: str, - version: str = "2023-06-01", - beta: Optional[str] = "tools-2024-04-04", -) -> Tuple[str, dict]: - """ - Dynamically generate the Anthropic endpoint and headers. - """ - url = smart_urljoin(base_url, "messages") - - headers = { - "Content-Type": "application/json", - "x-api-key": api_key, - "anthropic-version": version, - } - - # Add beta header if specified - if beta: - headers["anthropic-beta"] = beta - - return url, headers - - def anthropic_chat_completions_request( data: ChatCompletionRequest, inner_thoughts_xml_tag: Optional[str] = "thinking", diff --git a/letta/llm_api/llm_api_tools.py b/letta/llm_api/llm_api_tools.py index 1131cd76..0d423677 100644 --- a/letta/llm_api/llm_api_tools.py +++ b/letta/llm_api/llm_api_tools.py @@ -29,7 +29,6 @@ from letta.schemas.openai.chat_completion_request import ChatCompletionRequest, from letta.schemas.openai.chat_completion_response import ChatCompletionResponse from letta.settings import ModelSettings from letta.streaming_interface import AgentChunkStreamingInterface, AgentRefreshStreamingInterface -from letta.utils import run_async_task LLM_API_PROVIDER_OPTIONS = ["openai", "azure", "anthropic", "google_ai", "cohere", "local", "groq"] @@ -57,7 +56,9 @@ def retry_with_exponential_backoff( while True: try: return func(*args, **kwargs) - + except KeyboardInterrupt: + # Stop retrying if user hits Ctrl-C + raise KeyboardInterrupt("User intentionally stopped thread. Stopping...") except requests.exceptions.HTTPError as http_err: if not hasattr(http_err, "response") or not http_err.response: @@ -162,25 +163,21 @@ def create( assert isinstance(stream_interface, AgentChunkStreamingInterface) or isinstance( stream_interface, AgentRefreshStreamingInterface ), type(stream_interface) - response = run_async_task( - openai_chat_completions_process_stream( - url=llm_config.model_endpoint, - api_key=api_key, - chat_completion_request=data, - stream_interface=stream_interface, - ) + response = openai_chat_completions_process_stream( + url=llm_config.model_endpoint, + api_key=api_key, + chat_completion_request=data, + stream_interface=stream_interface, ) else: # Client did not request token streaming (expect a blocking backend response) data.stream = False if isinstance(stream_interface, AgentChunkStreamingInterface): stream_interface.stream_start() try: - response = run_async_task( - openai_chat_completions_request( - url=llm_config.model_endpoint, - api_key=api_key, - chat_completion_request=data, - ) + response = openai_chat_completions_request( + url=llm_config.model_endpoint, + api_key=api_key, + chat_completion_request=data, ) finally: if isinstance(stream_interface, AgentChunkStreamingInterface): @@ -354,12 +351,10 @@ def create( stream_interface.stream_start() try: # groq uses the openai chat completions API, so this component should be reusable - response = run_async_task( - openai_chat_completions_request( - url=llm_config.model_endpoint, - api_key=model_settings.groq_api_key, - chat_completion_request=data, - ) + response = openai_chat_completions_request( + url=llm_config.model_endpoint, + api_key=model_settings.groq_api_key, + chat_completion_request=data, ) finally: if isinstance(stream_interface, AgentChunkStreamingInterface): diff --git a/letta/llm_api/openai.py b/letta/llm_api/openai.py index d931e8fb..30caecdd 100644 --- a/letta/llm_api/openai.py +++ b/letta/llm_api/openai.py @@ -1,8 +1,8 @@ import warnings -from typing import AsyncGenerator, List, Optional, Union +from typing import Generator, List, Optional, Union import requests -from openai import AsyncOpenAI +from openai import OpenAI from letta.llm_api.helpers import add_inner_thoughts_to_functions, convert_to_structured_output, make_post_request from letta.local_llm.constants import INNER_THOUGHTS_KWARG, INNER_THOUGHTS_KWARG_DESCRIPTION, INNER_THOUGHTS_KWARG_DESCRIPTION_GO_FIRST @@ -158,7 +158,7 @@ def build_openai_chat_completions_request( return data -async def openai_chat_completions_process_stream( +def openai_chat_completions_process_stream( url: str, api_key: str, chat_completion_request: ChatCompletionRequest, @@ -231,7 +231,7 @@ async def openai_chat_completions_process_stream( n_chunks = 0 # approx == n_tokens chunk_idx = 0 try: - async for chat_completion_chunk in openai_chat_completions_request_stream( + for chat_completion_chunk in openai_chat_completions_request_stream( url=url, api_key=api_key, chat_completion_request=chat_completion_request ): assert isinstance(chat_completion_chunk, ChatCompletionChunkResponse), type(chat_completion_chunk) @@ -382,24 +382,21 @@ async def openai_chat_completions_process_stream( return chat_completion_response -async def openai_chat_completions_request_stream( +def openai_chat_completions_request_stream( url: str, api_key: str, chat_completion_request: ChatCompletionRequest, -) -> AsyncGenerator[ChatCompletionChunkResponse, None]: +) -> Generator[ChatCompletionChunkResponse, None, None]: data = prepare_openai_payload(chat_completion_request) data["stream"] = True - client = AsyncOpenAI( - api_key=api_key, - base_url=url, - ) - stream = await client.chat.completions.create(**data) - async for chunk in stream: + client = OpenAI(api_key=api_key, base_url=url, max_retries=0) + stream = client.chat.completions.create(**data) + for chunk in stream: # TODO: Use the native OpenAI objects here? yield ChatCompletionChunkResponse(**chunk.model_dump(exclude_none=True)) -async def openai_chat_completions_request( +def openai_chat_completions_request( url: str, api_key: str, chat_completion_request: ChatCompletionRequest, @@ -412,8 +409,8 @@ async def openai_chat_completions_request( https://platform.openai.com/docs/guides/text-generation?lang=curl """ data = prepare_openai_payload(chat_completion_request) - client = AsyncOpenAI(api_key=api_key, base_url=url) - chat_completion = await client.chat.completions.create(**data) + client = OpenAI(api_key=api_key, base_url=url, max_retries=0) + chat_completion = client.chat.completions.create(**data) return ChatCompletionResponse(**chat_completion.model_dump())