diff --git a/.github/workflows/send-message-integration-tests.yaml b/.github/workflows/send-message-integration-tests.yaml index e578bad4..c66336a3 100644 --- a/.github/workflows/send-message-integration-tests.yaml +++ b/.github/workflows/send-message-integration-tests.yaml @@ -41,6 +41,15 @@ jobs: --health-interval 10s --health-timeout 5s --health-retries 5 + redis: + image: redis:7 + ports: + - 6379:6379 + options: >- + --health-cmd "redis-cli ping" + --health-interval 5s + --health-timeout 5s + --health-retries 10 steps: # Ensure secrets don't leak @@ -138,6 +147,8 @@ jobs: LETTA_PG_PASSWORD: postgres LETTA_PG_DB: postgres LETTA_PG_HOST: localhost + LETTA_REDIS_HOST: localhost + LETTA_REDIS_PORT: 6379 LETTA_SERVER_PASS: test_server_token OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }} diff --git a/tests/integration_test_send_message.py b/tests/integration_test_send_message.py index acf2b733..47731fd1 100644 --- a/tests/integration_test_send_message.py +++ b/tests/integration_test_send_message.py @@ -1132,6 +1132,127 @@ def test_token_streaming_agent_loop_error( assert len(messages_from_db) == 0 +@pytest.mark.parametrize( + "llm_config", + TESTED_LLM_CONFIGS, + ids=[c.model for c in TESTED_LLM_CONFIGS], +) +def test_background_token_streaming_greeting_with_assistant_message( + disable_e2b_api_key: Any, + client: Letta, + agent_state: AgentState, + llm_config: LLMConfig, +) -> None: + """ + Tests sending a streaming message with a synchronous client. + Checks that each chunk in the stream has the correct message types. + """ + last_message = client.agents.messages.list(agent_id=agent_state.id, limit=1) + agent_state = client.agents.modify(agent_id=agent_state.id, llm_config=llm_config) + # Use longer message for Anthropic models to test if they stream in chunks + if llm_config.model_endpoint_type == "anthropic": + messages_to_send = USER_MESSAGE_FORCE_LONG_REPLY + else: + messages_to_send = USER_MESSAGE_FORCE_REPLY + response = client.agents.messages.create_stream( + agent_id=agent_state.id, + messages=messages_to_send, + stream_tokens=True, + background=True, + ) + messages = accumulate_chunks( + list(response), verify_token_streaming=(llm_config.model_endpoint_type in ["anthropic", "openai", "bedrock"]) + ) + assert_greeting_with_assistant_message_response(messages, streaming=True, token_streaming=True, llm_config=llm_config) + messages_from_db = client.agents.messages.list(agent_id=agent_state.id, after=last_message[0].id) + assert_greeting_with_assistant_message_response(messages_from_db, from_db=True, llm_config=llm_config) + + +@pytest.mark.parametrize( + "llm_config", + TESTED_LLM_CONFIGS, + ids=[c.model for c in TESTED_LLM_CONFIGS], +) +def test_background_token_streaming_greeting_without_assistant_message( + disable_e2b_api_key: Any, + client: Letta, + agent_state: AgentState, + llm_config: LLMConfig, +) -> None: + """ + Tests sending a streaming message with a synchronous client. + Checks that each chunk in the stream has the correct message types. + """ + last_message = client.agents.messages.list(agent_id=agent_state.id, limit=1) + agent_state = client.agents.modify(agent_id=agent_state.id, llm_config=llm_config) + # Use longer message for Anthropic models to force chunking + if llm_config.model_endpoint_type == "anthropic": + messages_to_send = USER_MESSAGE_FORCE_LONG_REPLY + else: + messages_to_send = USER_MESSAGE_FORCE_REPLY + response = client.agents.messages.create_stream( + agent_id=agent_state.id, + messages=messages_to_send, + use_assistant_message=False, + stream_tokens=True, + background=True, + ) + messages = accumulate_chunks( + list(response), verify_token_streaming=(llm_config.model_endpoint_type in ["anthropic", "openai", "bedrock"]) + ) + assert_greeting_without_assistant_message_response(messages, streaming=True, token_streaming=True, llm_config=llm_config) + messages_from_db = client.agents.messages.list(agent_id=agent_state.id, after=last_message[0].id, use_assistant_message=False) + assert_greeting_without_assistant_message_response(messages_from_db, from_db=True, llm_config=llm_config) + + +@pytest.mark.parametrize( + "llm_config", + TESTED_LLM_CONFIGS, + ids=[c.model for c in TESTED_LLM_CONFIGS], +) +def test_background_token_streaming_tool_call( + disable_e2b_api_key: Any, + client: Letta, + agent_state: AgentState, + llm_config: LLMConfig, +) -> None: + """ + Tests sending a streaming message with a synchronous client. + Checks that each chunk in the stream has the correct message types. + """ + # get the config filename + config_filename = None + for filename in filenames: + config = get_llm_config(filename) + if config.model_dump() == llm_config.model_dump(): + config_filename = filename + break + + # skip if this is a limited model + if not config_filename or config_filename in limited_configs: + pytest.skip(f"Skipping test for limited model {llm_config.model}") + + last_message = client.agents.messages.list(agent_id=agent_state.id, limit=1) + agent_state = client.agents.modify(agent_id=agent_state.id, llm_config=llm_config) + # Use longer message for Anthropic models to force chunking + if llm_config.model_endpoint_type == "anthropic": + messages_to_send = USER_MESSAGE_ROLL_DICE_LONG + else: + messages_to_send = USER_MESSAGE_ROLL_DICE + response = client.agents.messages.create_stream( + agent_id=agent_state.id, + messages=messages_to_send, + stream_tokens=True, + background=True, + ) + messages = accumulate_chunks( + list(response), verify_token_streaming=(llm_config.model_endpoint_type in ["anthropic", "openai", "bedrock"]) + ) + assert_tool_call_response(messages, streaming=True, llm_config=llm_config) + messages_from_db = client.agents.messages.list(agent_id=agent_state.id, after=last_message[0].id) + assert_tool_call_response(messages_from_db, from_db=True, llm_config=llm_config) + + def wait_for_run_completion(client: Letta, run_id: str, timeout: float = 30.0, interval: float = 0.5) -> Run: start = time.time() while True: