feat: add redis client to CI for background mode tests (#4155)
This commit is contained in:
@@ -41,6 +41,15 @@ jobs:
|
||||
--health-interval 10s
|
||||
--health-timeout 5s
|
||||
--health-retries 5
|
||||
redis:
|
||||
image: redis:7
|
||||
ports:
|
||||
- 6379:6379
|
||||
options: >-
|
||||
--health-cmd "redis-cli ping"
|
||||
--health-interval 5s
|
||||
--health-timeout 5s
|
||||
--health-retries 10
|
||||
|
||||
steps:
|
||||
# Ensure secrets don't leak
|
||||
@@ -138,6 +147,8 @@ jobs:
|
||||
LETTA_PG_PASSWORD: postgres
|
||||
LETTA_PG_DB: postgres
|
||||
LETTA_PG_HOST: localhost
|
||||
LETTA_REDIS_HOST: localhost
|
||||
LETTA_REDIS_PORT: 6379
|
||||
LETTA_SERVER_PASS: test_server_token
|
||||
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
|
||||
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
|
||||
|
||||
@@ -1132,6 +1132,127 @@ def test_token_streaming_agent_loop_error(
|
||||
assert len(messages_from_db) == 0
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"llm_config",
|
||||
TESTED_LLM_CONFIGS,
|
||||
ids=[c.model for c in TESTED_LLM_CONFIGS],
|
||||
)
|
||||
def test_background_token_streaming_greeting_with_assistant_message(
|
||||
disable_e2b_api_key: Any,
|
||||
client: Letta,
|
||||
agent_state: AgentState,
|
||||
llm_config: LLMConfig,
|
||||
) -> None:
|
||||
"""
|
||||
Tests sending a streaming message with a synchronous client.
|
||||
Checks that each chunk in the stream has the correct message types.
|
||||
"""
|
||||
last_message = client.agents.messages.list(agent_id=agent_state.id, limit=1)
|
||||
agent_state = client.agents.modify(agent_id=agent_state.id, llm_config=llm_config)
|
||||
# Use longer message for Anthropic models to test if they stream in chunks
|
||||
if llm_config.model_endpoint_type == "anthropic":
|
||||
messages_to_send = USER_MESSAGE_FORCE_LONG_REPLY
|
||||
else:
|
||||
messages_to_send = USER_MESSAGE_FORCE_REPLY
|
||||
response = client.agents.messages.create_stream(
|
||||
agent_id=agent_state.id,
|
||||
messages=messages_to_send,
|
||||
stream_tokens=True,
|
||||
background=True,
|
||||
)
|
||||
messages = accumulate_chunks(
|
||||
list(response), verify_token_streaming=(llm_config.model_endpoint_type in ["anthropic", "openai", "bedrock"])
|
||||
)
|
||||
assert_greeting_with_assistant_message_response(messages, streaming=True, token_streaming=True, llm_config=llm_config)
|
||||
messages_from_db = client.agents.messages.list(agent_id=agent_state.id, after=last_message[0].id)
|
||||
assert_greeting_with_assistant_message_response(messages_from_db, from_db=True, llm_config=llm_config)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"llm_config",
|
||||
TESTED_LLM_CONFIGS,
|
||||
ids=[c.model for c in TESTED_LLM_CONFIGS],
|
||||
)
|
||||
def test_background_token_streaming_greeting_without_assistant_message(
|
||||
disable_e2b_api_key: Any,
|
||||
client: Letta,
|
||||
agent_state: AgentState,
|
||||
llm_config: LLMConfig,
|
||||
) -> None:
|
||||
"""
|
||||
Tests sending a streaming message with a synchronous client.
|
||||
Checks that each chunk in the stream has the correct message types.
|
||||
"""
|
||||
last_message = client.agents.messages.list(agent_id=agent_state.id, limit=1)
|
||||
agent_state = client.agents.modify(agent_id=agent_state.id, llm_config=llm_config)
|
||||
# Use longer message for Anthropic models to force chunking
|
||||
if llm_config.model_endpoint_type == "anthropic":
|
||||
messages_to_send = USER_MESSAGE_FORCE_LONG_REPLY
|
||||
else:
|
||||
messages_to_send = USER_MESSAGE_FORCE_REPLY
|
||||
response = client.agents.messages.create_stream(
|
||||
agent_id=agent_state.id,
|
||||
messages=messages_to_send,
|
||||
use_assistant_message=False,
|
||||
stream_tokens=True,
|
||||
background=True,
|
||||
)
|
||||
messages = accumulate_chunks(
|
||||
list(response), verify_token_streaming=(llm_config.model_endpoint_type in ["anthropic", "openai", "bedrock"])
|
||||
)
|
||||
assert_greeting_without_assistant_message_response(messages, streaming=True, token_streaming=True, llm_config=llm_config)
|
||||
messages_from_db = client.agents.messages.list(agent_id=agent_state.id, after=last_message[0].id, use_assistant_message=False)
|
||||
assert_greeting_without_assistant_message_response(messages_from_db, from_db=True, llm_config=llm_config)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"llm_config",
|
||||
TESTED_LLM_CONFIGS,
|
||||
ids=[c.model for c in TESTED_LLM_CONFIGS],
|
||||
)
|
||||
def test_background_token_streaming_tool_call(
|
||||
disable_e2b_api_key: Any,
|
||||
client: Letta,
|
||||
agent_state: AgentState,
|
||||
llm_config: LLMConfig,
|
||||
) -> None:
|
||||
"""
|
||||
Tests sending a streaming message with a synchronous client.
|
||||
Checks that each chunk in the stream has the correct message types.
|
||||
"""
|
||||
# get the config filename
|
||||
config_filename = None
|
||||
for filename in filenames:
|
||||
config = get_llm_config(filename)
|
||||
if config.model_dump() == llm_config.model_dump():
|
||||
config_filename = filename
|
||||
break
|
||||
|
||||
# skip if this is a limited model
|
||||
if not config_filename or config_filename in limited_configs:
|
||||
pytest.skip(f"Skipping test for limited model {llm_config.model}")
|
||||
|
||||
last_message = client.agents.messages.list(agent_id=agent_state.id, limit=1)
|
||||
agent_state = client.agents.modify(agent_id=agent_state.id, llm_config=llm_config)
|
||||
# Use longer message for Anthropic models to force chunking
|
||||
if llm_config.model_endpoint_type == "anthropic":
|
||||
messages_to_send = USER_MESSAGE_ROLL_DICE_LONG
|
||||
else:
|
||||
messages_to_send = USER_MESSAGE_ROLL_DICE
|
||||
response = client.agents.messages.create_stream(
|
||||
agent_id=agent_state.id,
|
||||
messages=messages_to_send,
|
||||
stream_tokens=True,
|
||||
background=True,
|
||||
)
|
||||
messages = accumulate_chunks(
|
||||
list(response), verify_token_streaming=(llm_config.model_endpoint_type in ["anthropic", "openai", "bedrock"])
|
||||
)
|
||||
assert_tool_call_response(messages, streaming=True, llm_config=llm_config)
|
||||
messages_from_db = client.agents.messages.list(agent_id=agent_state.id, after=last_message[0].id)
|
||||
assert_tool_call_response(messages_from_db, from_db=True, llm_config=llm_config)
|
||||
|
||||
|
||||
def wait_for_run_completion(client: Letta, run_id: str, timeout: float = 30.0, interval: float = 0.5) -> Run:
|
||||
start = time.time()
|
||||
while True:
|
||||
|
||||
Reference in New Issue
Block a user