From cde4af996eff98d0c33c80b76a92ce8b00c3e009 Mon Sep 17 00:00:00 2001 From: Andy Li <55300002+cliandy@users.noreply.github.com> Date: Wed, 7 May 2025 14:31:50 -0700 Subject: [PATCH] feat: run composio with an asynchronousclient via aiohttp (#2026) --- letta/functions/async_composio_toolset.py | 106 ++++++++++++++++++++++ letta/functions/composio_helpers.py | 50 +++++----- 2 files changed, 129 insertions(+), 27 deletions(-) create mode 100644 letta/functions/async_composio_toolset.py diff --git a/letta/functions/async_composio_toolset.py b/letta/functions/async_composio_toolset.py new file mode 100644 index 00000000..5ebd385f --- /dev/null +++ b/letta/functions/async_composio_toolset.py @@ -0,0 +1,106 @@ +import json +from typing import Any + +import aiohttp +from composio import ComposioToolSet as BaseComposioToolSet +from composio.exceptions import ( + ApiKeyNotProvidedError, + ComposioSDKError, + ConnectedAccountNotFoundError, + EnumMetadataNotFound, + EnumStringNotFound, +) + + +class AsyncComposioToolSet(BaseComposioToolSet, runtime="letta"): + """ + Async version of ComposioToolSet client for interacting with Composio API + Used to asynchronously hit the execute action endpoint + + https://docs.composio.dev/api-reference/api-reference/v3/tools/post-api-v-3-tools-execute-action + """ + + def __init__(self, api_key: str, entity_id: str, lock: bool = True): + """ + Initialize the AsyncComposioToolSet client + + Args: + api_key (str): Your Composio API key + entity_id (str): Your Composio entity ID + lock (bool): Whether to use locking (default: True) + """ + super().__init__(api_key=api_key, entity_id=entity_id, lock=lock) + + self.headers = { + "Content-Type": "application/json", + "X-API-Key": self._api_key, + } + + async def execute_action( + self, + action: str, + params: dict[str, Any] = {}, + ) -> dict[str, Any]: + """ + Execute an action asynchronously using the Composio API + + Args: + action (str): The name of the action to execute + params (dict[str, Any], optional): Parameters for the action + + Returns: + dict[str, Any]: The API response + + Raises: + ApiKeyNotProvidedError: if the API key is not provided + ComposioSDKError: if a general Composio SDK error occurs + ConnectedAccountNotFoundError: if the connected account is not found + EnumMetadataNotFound: if enum metadata is not found + EnumStringNotFound: if enum string is not found + aiohttp.ClientError: if a network-related error occurs + ValueError: if an error with the parameters or response occurs + """ + API_VERSION = "v3" + endpoint = f"{self._base_url}/{API_VERSION}/tools/execute/{action}" + + json_payload = { + "entity_id": self.entity_id, + "arguments": params or {}, + } + + try: + async with aiohttp.ClientSession() as session: + async with session.post(endpoint, headers=self.headers, json=json_payload) as response: + print(response, response.status, response.reason, response.content) + if response.status == 200: + return await response.json() + else: + error_text = await response.text() + try: + error_json = json.loads(error_text) + error_message = error_json.get("message", error_text) + error_code = error_json.get("code") + + # Handle specific error codes from Composio API + if error_code == 10401 or "API_KEY_NOT_FOUND" in error_message: + raise ApiKeyNotProvidedError() + if "connected account not found" in error_message.lower(): + raise ConnectedAccountNotFoundError(f"Connected account not found: {error_message}") + if "enum metadata not found" in error_message.lower(): + raise EnumMetadataNotFound(f"Enum metadata not found: {error_message}") + if "enum string not found" in error_message.lower(): + raise EnumStringNotFound(f"Enum string not found: {error_message}") + except json.JSONDecodeError: + error_message = error_text + + # If no specific error was identified, raise a general error + raise ValueError(f"API request failed with status {response.status}: {error_message}") + except aiohttp.ClientError as e: + # Wrap network errors in ComposioSDKError + raise ComposioSDKError(f"Network error when calling Composio API: {str(e)}") + except ValueError as e: + # Re-raise ValueError (which could be our custom error message or a JSON parsing error) + raise + except Exception as e: + # Catch any other exceptions and wrap them in ComposioSDKError + raise ComposioSDKError(f"Unexpected error when calling Composio API: {str(e)}") diff --git a/letta/functions/composio_helpers.py b/letta/functions/composio_helpers.py index ae5cbb35..40d49791 100644 --- a/letta/functions/composio_helpers.py +++ b/letta/functions/composio_helpers.py @@ -1,8 +1,6 @@ -import asyncio import os from typing import Any, Optional -from composio import ComposioToolSet from composio.constants import DEFAULT_ENTITY_ID from composio.exceptions import ( ApiKeyNotProvidedError, @@ -13,6 +11,8 @@ from composio.exceptions import ( ) from letta.constants import COMPOSIO_ENTITY_ENV_VAR_KEY +from letta.functions.async_composio_toolset import AsyncComposioToolSet +from letta.utils import run_async_task # TODO: This is kind of hacky, as this is used to search up the action later on composio's side @@ -61,36 +61,32 @@ def {func_name}(**kwargs): async def execute_composio_action_async( action_name: str, args: dict, api_key: Optional[str] = None, entity_id: Optional[str] = None ) -> tuple[str, str]: + entity_id = entity_id or os.getenv(COMPOSIO_ENTITY_ENV_VAR_KEY, DEFAULT_ENTITY_ID) + composio_toolset = AsyncComposioToolSet(api_key=api_key, entity_id=entity_id, lock=False) try: - loop = asyncio.get_running_loop() - return await loop.run_in_executor(None, execute_composio_action, action_name, args, api_key, entity_id) + response = await composio_toolset.execute_action(action=action_name, params=args) + except ApiKeyNotProvidedError as e: + raise RuntimeError(f"API key not provided or invalid for Composio action '{action_name}': {str(e)}") + except ConnectedAccountNotFoundError as e: + raise RuntimeError(f"Connected account not found for Composio action '{action_name}': {str(e)}") + except EnumMetadataNotFound as e: + raise RuntimeError(f"Enum metadata not found for Composio action '{action_name}': {str(e)}") + except EnumStringNotFound as e: + raise RuntimeError(f"Enum string not found for Composio action '{action_name}': {str(e)}") + except ComposioSDKError as e: + raise RuntimeError(f"Composio SDK error while executing action '{action_name}': {str(e)}") except Exception as e: - raise RuntimeError(f"Error in execute_composio_action_async: {e}") from e + print(type(e)) + raise RuntimeError(f"An unexpected error occurred in Composio SDK while executing action '{action_name}': {str(e)}") + + if "error" in response and response["error"]: + raise RuntimeError(f"Error while executing action '{action_name}': {str(response['error'])}") + + return response.get("data") def execute_composio_action(action_name: str, args: dict, api_key: Optional[str] = None, entity_id: Optional[str] = None) -> Any: - entity_id = entity_id or os.getenv(COMPOSIO_ENTITY_ENV_VAR_KEY, DEFAULT_ENTITY_ID) - try: - composio_toolset = ComposioToolSet(api_key=api_key, entity_id=entity_id, lock=False) - response = composio_toolset.execute_action(action=action_name, params=args) - except ApiKeyNotProvidedError: - raise RuntimeError( - f"Composio API key is missing for action '{action_name}'. " - "Please set the sandbox environment variables either through the ADE or the API." - ) - except ConnectedAccountNotFoundError: - raise RuntimeError(f"No connected account was found for action '{action_name}'. " "Please link an account and try again.") - except EnumStringNotFound as e: - raise RuntimeError(f"Invalid value provided for action '{action_name}': " + str(e) + ". Please check the action parameters.") - except EnumMetadataNotFound as e: - raise RuntimeError(f"Invalid value provided for action '{action_name}': " + str(e) + ". Please check the action parameters.") - except ComposioSDKError as e: - raise RuntimeError(f"An unexpected error occurred in Composio SDK while executing action '{action_name}': " + str(e)) - - if "error" in response and response["error"]: - raise RuntimeError(f"Error while executing action '{action_name}': " + str(response["error"])) - - return response.get("data") + return run_async_task(execute_composio_action_async(action_name, args, api_key, entity_id)) def _assert_code_gen_compilable(code_str):