feat: run composio with an asynchronousclient via aiohttp (#2026)

This commit is contained in:
Andy Li
2025-05-07 14:31:50 -07:00
committed by GitHub
parent f5b224602e
commit 6336c9dca4
2 changed files with 129 additions and 27 deletions

View File

@@ -0,0 +1,106 @@
import json
from typing import Any
import aiohttp
from composio import ComposioToolSet as BaseComposioToolSet
from composio.exceptions import (
ApiKeyNotProvidedError,
ComposioSDKError,
ConnectedAccountNotFoundError,
EnumMetadataNotFound,
EnumStringNotFound,
)
class AsyncComposioToolSet(BaseComposioToolSet, runtime="letta"):
"""
Async version of ComposioToolSet client for interacting with Composio API
Used to asynchronously hit the execute action endpoint
https://docs.composio.dev/api-reference/api-reference/v3/tools/post-api-v-3-tools-execute-action
"""
def __init__(self, api_key: str, entity_id: str, lock: bool = True):
"""
Initialize the AsyncComposioToolSet client
Args:
api_key (str): Your Composio API key
entity_id (str): Your Composio entity ID
lock (bool): Whether to use locking (default: True)
"""
super().__init__(api_key=api_key, entity_id=entity_id, lock=lock)
self.headers = {
"Content-Type": "application/json",
"X-API-Key": self._api_key,
}
async def execute_action(
self,
action: str,
params: dict[str, Any] = {},
) -> dict[str, Any]:
"""
Execute an action asynchronously using the Composio API
Args:
action (str): The name of the action to execute
params (dict[str, Any], optional): Parameters for the action
Returns:
dict[str, Any]: The API response
Raises:
ApiKeyNotProvidedError: if the API key is not provided
ComposioSDKError: if a general Composio SDK error occurs
ConnectedAccountNotFoundError: if the connected account is not found
EnumMetadataNotFound: if enum metadata is not found
EnumStringNotFound: if enum string is not found
aiohttp.ClientError: if a network-related error occurs
ValueError: if an error with the parameters or response occurs
"""
API_VERSION = "v3"
endpoint = f"{self._base_url}/{API_VERSION}/tools/execute/{action}"
json_payload = {
"entity_id": self.entity_id,
"arguments": params or {},
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(endpoint, headers=self.headers, json=json_payload) as response:
print(response, response.status, response.reason, response.content)
if response.status == 200:
return await response.json()
else:
error_text = await response.text()
try:
error_json = json.loads(error_text)
error_message = error_json.get("message", error_text)
error_code = error_json.get("code")
# Handle specific error codes from Composio API
if error_code == 10401 or "API_KEY_NOT_FOUND" in error_message:
raise ApiKeyNotProvidedError()
if "connected account not found" in error_message.lower():
raise ConnectedAccountNotFoundError(f"Connected account not found: {error_message}")
if "enum metadata not found" in error_message.lower():
raise EnumMetadataNotFound(f"Enum metadata not found: {error_message}")
if "enum string not found" in error_message.lower():
raise EnumStringNotFound(f"Enum string not found: {error_message}")
except json.JSONDecodeError:
error_message = error_text
# If no specific error was identified, raise a general error
raise ValueError(f"API request failed with status {response.status}: {error_message}")
except aiohttp.ClientError as e:
# Wrap network errors in ComposioSDKError
raise ComposioSDKError(f"Network error when calling Composio API: {str(e)}")
except ValueError as e:
# Re-raise ValueError (which could be our custom error message or a JSON parsing error)
raise
except Exception as e:
# Catch any other exceptions and wrap them in ComposioSDKError
raise ComposioSDKError(f"Unexpected error when calling Composio API: {str(e)}")

View File

@@ -1,8 +1,6 @@
import asyncio
import os
from typing import Any, Optional
from composio import ComposioToolSet
from composio.constants import DEFAULT_ENTITY_ID
from composio.exceptions import (
ApiKeyNotProvidedError,
@@ -13,6 +11,8 @@ from composio.exceptions import (
)
from letta.constants import COMPOSIO_ENTITY_ENV_VAR_KEY
from letta.functions.async_composio_toolset import AsyncComposioToolSet
from letta.utils import run_async_task
# TODO: This is kind of hacky, as this is used to search up the action later on composio's side
@@ -61,36 +61,32 @@ def {func_name}(**kwargs):
async def execute_composio_action_async(
action_name: str, args: dict, api_key: Optional[str] = None, entity_id: Optional[str] = None
) -> tuple[str, str]:
entity_id = entity_id or os.getenv(COMPOSIO_ENTITY_ENV_VAR_KEY, DEFAULT_ENTITY_ID)
composio_toolset = AsyncComposioToolSet(api_key=api_key, entity_id=entity_id, lock=False)
try:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, execute_composio_action, action_name, args, api_key, entity_id)
response = await composio_toolset.execute_action(action=action_name, params=args)
except ApiKeyNotProvidedError as e:
raise RuntimeError(f"API key not provided or invalid for Composio action '{action_name}': {str(e)}")
except ConnectedAccountNotFoundError as e:
raise RuntimeError(f"Connected account not found for Composio action '{action_name}': {str(e)}")
except EnumMetadataNotFound as e:
raise RuntimeError(f"Enum metadata not found for Composio action '{action_name}': {str(e)}")
except EnumStringNotFound as e:
raise RuntimeError(f"Enum string not found for Composio action '{action_name}': {str(e)}")
except ComposioSDKError as e:
raise RuntimeError(f"Composio SDK error while executing action '{action_name}': {str(e)}")
except Exception as e:
raise RuntimeError(f"Error in execute_composio_action_async: {e}") from e
print(type(e))
raise RuntimeError(f"An unexpected error occurred in Composio SDK while executing action '{action_name}': {str(e)}")
if "error" in response and response["error"]:
raise RuntimeError(f"Error while executing action '{action_name}': {str(response['error'])}")
return response.get("data")
def execute_composio_action(action_name: str, args: dict, api_key: Optional[str] = None, entity_id: Optional[str] = None) -> Any:
entity_id = entity_id or os.getenv(COMPOSIO_ENTITY_ENV_VAR_KEY, DEFAULT_ENTITY_ID)
try:
composio_toolset = ComposioToolSet(api_key=api_key, entity_id=entity_id, lock=False)
response = composio_toolset.execute_action(action=action_name, params=args)
except ApiKeyNotProvidedError:
raise RuntimeError(
f"Composio API key is missing for action '{action_name}'. "
"Please set the sandbox environment variables either through the ADE or the API."
)
except ConnectedAccountNotFoundError:
raise RuntimeError(f"No connected account was found for action '{action_name}'. " "Please link an account and try again.")
except EnumStringNotFound as e:
raise RuntimeError(f"Invalid value provided for action '{action_name}': " + str(e) + ". Please check the action parameters.")
except EnumMetadataNotFound as e:
raise RuntimeError(f"Invalid value provided for action '{action_name}': " + str(e) + ". Please check the action parameters.")
except ComposioSDKError as e:
raise RuntimeError(f"An unexpected error occurred in Composio SDK while executing action '{action_name}': " + str(e))
if "error" in response and response["error"]:
raise RuntimeError(f"Error while executing action '{action_name}': " + str(response["error"]))
return response.get("data")
return run_async_task(execute_composio_action_async(action_name, args, api_key, entity_id))
def _assert_code_gen_compilable(code_str):