* auto fixes * auto fix pt2 and transitive deps and undefined var checking locals() * manual fixes (ignored or letta-code fixed) * fix circular import * remove all ignores, add FastAPI rules and Ruff rules * add ty and precommit * ruff stuff * ty check fixes * ty check fixes pt 2 * error on invalid
928 lines
34 KiB
Python
928 lines
34 KiB
Python
import importlib.util
|
|
import inspect
|
|
import json
|
|
import multiprocessing as mp
|
|
import os
|
|
import time
|
|
from functools import partial
|
|
|
|
import pytest
|
|
import requests
|
|
from pydantic import BaseModel
|
|
|
|
from letta.functions.functions import derive_openai_json_schema
|
|
from letta.functions.schema_generator import validate_google_style_docstring
|
|
from letta.helpers.tool_execution_helper import enable_strict_mode
|
|
from letta.llm_api.helpers import convert_to_structured_output
|
|
from letta.schemas.tool import MCP_TOOL_METADATA_SCHEMA_STATUS, Tool
|
|
|
|
|
|
def _clean_diff(d1, d2):
|
|
"""Utility function to clean up the diff between two dictionaries."""
|
|
|
|
# Keys in d1 but not in d2
|
|
removed = {k: d1[k] for k in d1.keys() - d2.keys()}
|
|
|
|
# Keys in d2 but not in d1
|
|
added = {k: d2[k] for k in d2.keys() - d1.keys()}
|
|
|
|
# Keys in both but values changed
|
|
changed = {k: (d1[k], d2[k]) for k in d1.keys() & d2.keys() if d1[k] != d2[k]}
|
|
|
|
return {k: v for k, v in {"removed": removed, "added": added, "changed": changed}.items() if v} # Only include non-empty differences
|
|
|
|
|
|
def _compare_schemas(generated_schema: dict, expected_schema: dict, strip_heartbeat: bool = True):
|
|
"""Compare an autogenerated schema to an expected schema."""
|
|
|
|
if strip_heartbeat:
|
|
# Pop out the heartbeat parameter
|
|
del generated_schema["parameters"]["properties"]["request_heartbeat"]
|
|
# Remove from the required list
|
|
generated_schema["parameters"]["required"].remove("request_heartbeat")
|
|
|
|
# Check that the two schemas are equal
|
|
# If not, pretty print the difference by dumping with indent=4
|
|
if generated_schema != expected_schema:
|
|
print("==== GENERATED SCHEMA ====")
|
|
print(json.dumps(generated_schema, indent=4))
|
|
print("==== EXPECTED SCHEMA ====")
|
|
print(json.dumps(expected_schema, indent=4))
|
|
print("==== DIFF ====")
|
|
print(json.dumps(_clean_diff(generated_schema, expected_schema), indent=4))
|
|
raise AssertionError("Schemas are not equal")
|
|
else:
|
|
print("Schemas are equal")
|
|
|
|
|
|
def _run_schema_test(schema_name: str, desired_function_name: str, expect_structured_output_fail: bool = False):
|
|
"""Load a file and compare the autogenerated schema to the expected schema."""
|
|
|
|
# Open the python file as a string
|
|
# Use the absolute path to make it easier to run the test from the root directory
|
|
with open(os.path.join(os.path.dirname(__file__), f"test_tool_schema_parsing_files/{schema_name}.py"), "r") as file:
|
|
source_code = file.read()
|
|
|
|
# Derive the schema
|
|
schema = derive_openai_json_schema(source_code, name=desired_function_name)
|
|
|
|
# Assert that the schema matches the expected schema
|
|
with open(os.path.join(os.path.dirname(__file__), f"test_tool_schema_parsing_files/{schema_name}.json"), "r") as file:
|
|
expected_schema = json.load(file)
|
|
|
|
_compare_schemas(schema, expected_schema, False)
|
|
|
|
# Convert to structured output and compare
|
|
if expect_structured_output_fail:
|
|
with pytest.raises(ValueError):
|
|
structured_output = convert_to_structured_output(schema)
|
|
|
|
else:
|
|
structured_output = convert_to_structured_output(schema)
|
|
|
|
with open(os.path.join(os.path.dirname(__file__), f"test_tool_schema_parsing_files/{schema_name}_so.json"), "r") as file:
|
|
expected_structured_output = json.load(file)
|
|
|
|
_compare_schemas(structured_output, expected_structured_output, strip_heartbeat=False)
|
|
|
|
return (schema_name, True) # Return success status
|
|
|
|
|
|
def test_derive_openai_json_schema():
|
|
"""Test that the schema generator works across a variety of example source code inputs."""
|
|
|
|
# Define test cases
|
|
test_cases = [
|
|
("pydantic_as_single_arg_example", "create_step", False),
|
|
("list_of_pydantic_example", "create_task_plan", False),
|
|
# ("nested_pydantic_as_arg_example", "create_task_plan", False),
|
|
("simple_d20", "roll_d20", False),
|
|
("all_python_complex", "check_order_status", True),
|
|
("all_python_complex_nodict", "check_order_status", False),
|
|
]
|
|
|
|
# Create a multiprocessing pool
|
|
pool = mp.Pool(processes=min(mp.cpu_count(), len(test_cases)))
|
|
|
|
# Run tests in parallel
|
|
results = []
|
|
for schema_name, function_name, expect_fail in test_cases:
|
|
print(f"==== TESTING {schema_name} ====")
|
|
# Use apply_async for non-blocking parallel execution
|
|
result = pool.apply_async(_run_schema_test, args=(schema_name, function_name, expect_fail))
|
|
results.append((schema_name, result))
|
|
|
|
# Collect results and check for failures
|
|
for schema_name, result in results:
|
|
try:
|
|
_schema_name_result, success = result.get(timeout=60) # Wait for the result with timeout
|
|
assert success, f"Test for {schema_name} failed"
|
|
print(f"Test for {schema_name} passed")
|
|
except Exception as e:
|
|
print(f"Test for {schema_name} failed with error: {str(e)}")
|
|
raise
|
|
|
|
# Close the pool
|
|
pool.close()
|
|
pool.join()
|
|
|
|
|
|
def _openai_payload(test_config):
|
|
"""Create an OpenAI payload with a tool call.
|
|
|
|
Args:
|
|
test_config: A tuple containing (filename, model, structured_output)
|
|
|
|
Returns:
|
|
A tuple of (filename, model, structured_output, success, error_message)
|
|
"""
|
|
filename, model, structured_output = test_config
|
|
success = False
|
|
error_message = None
|
|
|
|
try:
|
|
# Load schema
|
|
with open(os.path.join(os.path.dirname(__file__), f"test_tool_schema_parsing_files/{filename}.py"), "r") as file:
|
|
source_code = file.read()
|
|
|
|
schema = derive_openai_json_schema(source_code)
|
|
|
|
# Check if we expect the conversion to fail
|
|
if filename == "all_python_complex" and structured_output:
|
|
try:
|
|
convert_to_structured_output(schema)
|
|
error_message = "Expected ValueError for all_python_complex with structured_output=True"
|
|
return (filename, model, structured_output, False, error_message)
|
|
except ValueError:
|
|
# This is expected
|
|
success = True
|
|
return (filename, model, structured_output, success, error_message)
|
|
|
|
# Generate tool schema
|
|
if structured_output:
|
|
tool_schema = convert_to_structured_output(schema)
|
|
else:
|
|
tool_schema = schema
|
|
|
|
api_key = os.getenv("OPENAI_API_KEY")
|
|
assert api_key is not None, "OPENAI_API_KEY must be set"
|
|
|
|
# Simple system prompt to encourage the LLM to jump directly to a tool call
|
|
system_prompt = "You job is to test the tool that you've been provided. Don't ask for any clarification on the args, just come up with some dummy data and try executing the tool."
|
|
|
|
url = "https://api.openai.com/v1/chat/completions"
|
|
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}
|
|
data = {
|
|
"model": model,
|
|
"messages": [
|
|
{"role": "system", "content": system_prompt},
|
|
],
|
|
"tools": [
|
|
{
|
|
"type": "function",
|
|
"function": tool_schema,
|
|
}
|
|
],
|
|
"tool_choice": "auto",
|
|
"parallel_tool_calls": False,
|
|
}
|
|
|
|
response = requests.post(url, headers=headers, json=data)
|
|
response.raise_for_status()
|
|
success = True
|
|
|
|
except Exception as e:
|
|
error_message = str(e)
|
|
|
|
return (filename, model, structured_output, success, error_message)
|
|
|
|
|
|
@pytest.mark.parametrize("openai_model", ["gpt-4o"])
|
|
@pytest.mark.parametrize("structured_output", [True, False])
|
|
def test_valid_schemas_via_openai(openai_model: str, structured_output: bool):
|
|
"""Test that we can send the schemas to OpenAI and get a tool call back."""
|
|
|
|
start_time = time.time()
|
|
|
|
# Define all test configurations
|
|
filenames = [
|
|
"pydantic_as_single_arg_example",
|
|
"list_of_pydantic_example",
|
|
"nested_pydantic_as_arg_example",
|
|
"simple_d20",
|
|
"all_python_complex",
|
|
"all_python_complex_nodict",
|
|
]
|
|
|
|
test_configs = []
|
|
for filename in filenames:
|
|
test_configs.append((filename, openai_model, structured_output))
|
|
|
|
# Run tests in parallel using a process pool (more efficient for API calls)
|
|
pool = mp.Pool(processes=min(mp.cpu_count(), len(test_configs)))
|
|
results = pool.map(_openai_payload, test_configs)
|
|
|
|
# Check results and handle failures
|
|
for filename, model, structured, success, error_message in results:
|
|
print(f"Test for {filename}, {model}, structured_output={structured}: {'SUCCESS' if success else 'FAILED'}")
|
|
|
|
if not success:
|
|
if filename == "all_python_complex" and structured and "Expected ValueError" in error_message:
|
|
pytest.fail(f"Failed for {filename} with {model}, structured_output={structured}: {error_message}")
|
|
elif not (filename == "all_python_complex" and structured):
|
|
pytest.fail(f"Failed for {filename} with {model}, structured_output={structured}: {error_message}")
|
|
|
|
pool.close()
|
|
pool.join()
|
|
|
|
end_time = time.time()
|
|
print(f"Total execution time: {end_time - start_time:.2f} seconds")
|
|
|
|
|
|
# Helper function for pydantic args schema test
|
|
def _run_pydantic_args_test(filename, openai_model, structured_output):
|
|
"""Run a single pydantic args schema test case"""
|
|
try:
|
|
# Import the module dynamically
|
|
file_path = os.path.join(os.path.dirname(__file__), f"test_tool_schema_parsing_files/{filename}.py")
|
|
spec = importlib.util.spec_from_file_location(filename, file_path)
|
|
module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(module)
|
|
|
|
# Find the function definition and args schema if defined
|
|
last_function_name, last_function_source, last_model_class = None, None, None
|
|
for name, obj in inspect.getmembers(module):
|
|
if inspect.isfunction(obj) and obj.__module__ == module.__name__:
|
|
last_function_name = name
|
|
last_function_source = inspect.getsource(obj) # only import the function, not the whole file
|
|
if inspect.isclass(obj) and obj.__module__ == module.__name__ and issubclass(obj, BaseModel):
|
|
last_model_class = obj
|
|
|
|
# Get the ArgsSchema if it exists
|
|
args_schema = None
|
|
if last_model_class:
|
|
args_schema = last_model_class.model_json_schema()
|
|
|
|
tool = Tool(
|
|
name=last_function_name,
|
|
source_code=last_function_source,
|
|
args_json_schema=args_schema,
|
|
)
|
|
from letta.services.tool_schema_generator import generate_schema_for_tool_creation
|
|
|
|
tool.json_schema = generate_schema_for_tool_creation(tool)
|
|
schema = tool.json_schema
|
|
|
|
# We expect this to fail for all_python_complex with structured_output=True
|
|
if filename == "all_python_complex" and structured_output:
|
|
try:
|
|
convert_to_structured_output(schema)
|
|
return (filename, False, "Expected ValueError but conversion succeeded")
|
|
except ValueError:
|
|
return (filename, True, None) # This is expected
|
|
|
|
# Make the API call
|
|
if structured_output:
|
|
tool_schema = convert_to_structured_output(schema)
|
|
else:
|
|
tool_schema = schema
|
|
|
|
api_key = os.getenv("OPENAI_API_KEY")
|
|
assert api_key is not None, "OPENAI_API_KEY must be set"
|
|
|
|
system_prompt = "You job is to test the tool that you've been provided. Don't ask for any clarification on the args, just come up with some dummy data and try executing the tool."
|
|
|
|
url = "https://api.openai.com/v1/chat/completions"
|
|
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}
|
|
data = {
|
|
"model": openai_model,
|
|
"messages": [
|
|
{"role": "system", "content": system_prompt},
|
|
],
|
|
"tools": [
|
|
{
|
|
"type": "function",
|
|
"function": tool_schema,
|
|
}
|
|
],
|
|
"tool_choice": "auto",
|
|
"parallel_tool_calls": False,
|
|
}
|
|
|
|
response = requests.post(url, headers=headers, json=data)
|
|
response.raise_for_status()
|
|
return (filename, True, None) # Success
|
|
except Exception as e:
|
|
return (filename, False, str(e)) # Failure with error message
|
|
|
|
|
|
@pytest.mark.parametrize("openai_model", ["gpt-4o"])
|
|
@pytest.mark.parametrize("structured_output", [True, False])
|
|
def test_valid_schemas_with_pydantic_args_schema(openai_model: str, structured_output: bool):
|
|
"""Test that we can send the schemas to OpenAI and get a tool call back."""
|
|
|
|
start_time = time.time()
|
|
|
|
filenames = [
|
|
"pydantic_as_single_arg_example",
|
|
"list_of_pydantic_example",
|
|
"nested_pydantic_as_arg_example",
|
|
"simple_d20",
|
|
"all_python_complex",
|
|
"all_python_complex_nodict",
|
|
]
|
|
|
|
# Create a pool of processes
|
|
pool = mp.Pool(processes=min(mp.cpu_count(), len(filenames)))
|
|
|
|
# Map the work to the pool
|
|
func = partial(_run_pydantic_args_test, openai_model=openai_model, structured_output=structured_output)
|
|
results = pool.map(func, filenames)
|
|
|
|
# Check results
|
|
for filename, success, error_message in results:
|
|
print(f"Test for {filename}: {'SUCCESS' if success else 'FAILED - ' + error_message}")
|
|
|
|
# Special handling for expected failure
|
|
if filename == "all_python_complex" and structured_output:
|
|
assert success, f"Expected failure handling for {filename} didn't work: {error_message}"
|
|
else:
|
|
assert success, f"Test for {filename} failed: {error_message}"
|
|
|
|
pool.close()
|
|
pool.join()
|
|
|
|
end_time = time.time()
|
|
print(f"Total execution time: {end_time - start_time:.2f} seconds")
|
|
|
|
|
|
# Google comment style validation tests
|
|
|
|
|
|
# ---------- helpers ----------
|
|
def _check(fn, expected_regex: str | None = None):
|
|
if expected_regex is None:
|
|
# should pass
|
|
validate_google_style_docstring(fn)
|
|
else:
|
|
with pytest.raises(ValueError, match=expected_regex):
|
|
validate_google_style_docstring(fn)
|
|
|
|
|
|
# ---------- passing cases ----------
|
|
def good_function(file_requests: list, close_all_others: bool = False) -> str:
|
|
"""Open files.
|
|
|
|
Args:
|
|
file_requests (list): Requests.
|
|
close_all_others (bool): Flag.
|
|
|
|
Returns:
|
|
str: Status.
|
|
"""
|
|
return "ok"
|
|
|
|
|
|
def good_function_no_return(file_requests: list, close_all_others: bool = False) -> str:
|
|
"""Open files.
|
|
|
|
Args:
|
|
file_requests (list): Requests.
|
|
close_all_others (bool): Flag.
|
|
"""
|
|
return "ok"
|
|
|
|
|
|
def agent_state_ok(agent_state, value: int) -> str:
|
|
"""Ignores agent_state param.
|
|
|
|
Args:
|
|
value (int): Some value.
|
|
|
|
Returns:
|
|
str: Status.
|
|
"""
|
|
return "ok"
|
|
|
|
|
|
class Dummy:
|
|
def method(self, bar: int) -> str: # keeps an explicit self
|
|
"""Bound-method example.
|
|
|
|
Args:
|
|
bar (int): Number.
|
|
|
|
Returns:
|
|
str: Status.
|
|
"""
|
|
return "ok"
|
|
|
|
|
|
# ---------- failing cases ----------
|
|
def no_doc(x: int) -> str:
|
|
return "fail"
|
|
|
|
|
|
def no_args(x: int) -> str:
|
|
"""Missing Args.
|
|
|
|
Returns:
|
|
str: Status.
|
|
"""
|
|
return "fail"
|
|
|
|
|
|
def missing_param_doc(x: int, y: int) -> str:
|
|
"""Only one param documented.
|
|
|
|
Args:
|
|
x (int): X.
|
|
|
|
Returns:
|
|
str: Status.
|
|
"""
|
|
return "fail"
|
|
|
|
|
|
# ---------- parametrized test ----------
|
|
@pytest.mark.parametrize(
|
|
"fn, regex",
|
|
[
|
|
(good_function, None),
|
|
(agent_state_ok, None),
|
|
(Dummy.method, None), # unbound method keeps `self`
|
|
(good_function_no_return, None),
|
|
(no_doc, "has no docstring"),
|
|
(no_args, "must have 'Args:' section"),
|
|
(missing_param_doc, "parameter 'y' not documented"),
|
|
],
|
|
)
|
|
def test_google_style_docstring_validation(fn, regex):
|
|
_check(fn, regex)
|
|
|
|
|
|
def test_reserved_params_excluded_from_schema():
|
|
"""Test that reserved params (agent_state) are excluded from generated schema."""
|
|
from letta.functions.schema_generator import generate_schema
|
|
|
|
# Test with agent_state param
|
|
schema = generate_schema(agent_state_ok)
|
|
assert "agent_state" not in schema["parameters"]["properties"], "agent_state should be excluded from schema"
|
|
assert "value" in schema["parameters"]["properties"], "value should be in schema"
|
|
assert schema["parameters"]["required"] == ["value"], "only value should be required"
|
|
|
|
|
|
def test_complex_nested_anyof_schema_to_structured_output():
|
|
"""Test that complex nested anyOf schemas with inlined $refs can be converted to structured outputs.
|
|
|
|
This test verifies that convert_to_structured_output properly handles:
|
|
- Simple anyOf (primitives) - flattened to type arrays
|
|
- Complex anyOf (with objects) - preserved as anyOf
|
|
- Nested structures with recursion
|
|
"""
|
|
|
|
# This is the schema generated by our anyOf inlining approach for MCP tools
|
|
# It uses anyOf throughout and has fully inlined $refs
|
|
schema = {
|
|
"name": "get_vehicle_configuration",
|
|
"description": "Get vehicle configuration details for a given model type and optional dealer info and customization options.",
|
|
"parameters": {
|
|
"$defs": {
|
|
"Feature": {
|
|
"properties": {
|
|
"feature_id": {"anyOf": [{"type": "string"}, {"type": "null"}], "default": None, "title": "Feature ID"},
|
|
"category_code": {"anyOf": [{"type": "integer"}, {"type": "null"}], "default": None, "title": "Category Code"},
|
|
"variant_code": {"anyOf": [{"type": "integer"}, {"type": "null"}], "default": None, "title": "Variant Code"},
|
|
"package_level": {"anyOf": [{"type": "integer"}, {"type": "null"}], "default": None, "title": "Package Level"},
|
|
},
|
|
"type": "object",
|
|
"title": "Feature",
|
|
"additionalProperties": False,
|
|
},
|
|
"CustomizationData": {
|
|
"properties": {
|
|
"has_premium_package": {
|
|
"anyOf": [{"type": "boolean"}, {"type": "null"}],
|
|
"default": None,
|
|
"title": "Has Premium Package",
|
|
},
|
|
"has_multiple_trims": {
|
|
"anyOf": [{"type": "boolean"}, {"type": "null"}],
|
|
"default": None,
|
|
"title": "Has Multiple Trims",
|
|
},
|
|
"selected_features": {
|
|
"anyOf": [{"items": {"$ref": "#/$defs/Feature"}, "type": "array"}, {"type": "null"}],
|
|
"default": None,
|
|
"title": "Selected Features",
|
|
},
|
|
},
|
|
"type": "object",
|
|
"title": "CustomizationData",
|
|
"additionalProperties": False,
|
|
},
|
|
"VehicleModel": {
|
|
"type": "string",
|
|
"enum": [
|
|
"sedan",
|
|
"suv",
|
|
"truck",
|
|
"coupe",
|
|
"hatchback",
|
|
"minivan",
|
|
"wagon",
|
|
"convertible",
|
|
"sports",
|
|
"luxury",
|
|
"electric",
|
|
"hybrid",
|
|
"compact",
|
|
"crossover",
|
|
"other",
|
|
"unknown",
|
|
],
|
|
"title": "VehicleModel",
|
|
},
|
|
},
|
|
"properties": {
|
|
"model_type": {
|
|
"description": "The vehicle model type selection.",
|
|
"title": "Model Type",
|
|
"type": "string",
|
|
"enum": [
|
|
"sedan",
|
|
"suv",
|
|
"truck",
|
|
"coupe",
|
|
"hatchback",
|
|
"minivan",
|
|
"wagon",
|
|
"convertible",
|
|
"sports",
|
|
"luxury",
|
|
"electric",
|
|
"hybrid",
|
|
"compact",
|
|
"crossover",
|
|
"other",
|
|
"unknown",
|
|
],
|
|
},
|
|
"dealer_location": {
|
|
"anyOf": [{"type": "string"}, {"type": "null"}],
|
|
"default": None,
|
|
"description": "Dealer location identifier from registration system, if available.",
|
|
"title": "Dealer Location",
|
|
},
|
|
"customization_options": {
|
|
"anyOf": [
|
|
{
|
|
"type": "object",
|
|
"additionalProperties": False,
|
|
"properties": {
|
|
"has_premium_package": {
|
|
"anyOf": [{"type": "boolean"}, {"type": "null"}],
|
|
"default": None,
|
|
"title": "Has Premium Package",
|
|
},
|
|
"has_multiple_trims": {
|
|
"anyOf": [{"type": "boolean"}, {"type": "null"}],
|
|
"default": None,
|
|
"title": "Has Multiple Trims",
|
|
},
|
|
"selected_features": {
|
|
"anyOf": [
|
|
{
|
|
"items": {
|
|
"properties": {
|
|
"feature_id": {
|
|
"anyOf": [{"type": "string"}, {"type": "null"}],
|
|
"default": None,
|
|
"title": "Feature ID",
|
|
},
|
|
"category_code": {
|
|
"anyOf": [{"type": "integer"}, {"type": "null"}],
|
|
"default": None,
|
|
"title": "Category Code",
|
|
},
|
|
"variant_code": {
|
|
"anyOf": [{"type": "integer"}, {"type": "null"}],
|
|
"default": None,
|
|
"title": "Variant Code",
|
|
},
|
|
"package_level": {
|
|
"anyOf": [{"type": "integer"}, {"type": "null"}],
|
|
"default": None,
|
|
"title": "Package Level",
|
|
},
|
|
},
|
|
"type": "object",
|
|
"title": "Feature",
|
|
"additionalProperties": False,
|
|
},
|
|
"type": "array",
|
|
},
|
|
{"type": "null"},
|
|
],
|
|
"default": None,
|
|
"title": "Selected Features",
|
|
},
|
|
},
|
|
"title": "CustomizationData",
|
|
},
|
|
{"type": "null"},
|
|
],
|
|
"default": None,
|
|
"description": "Customization preferences for the vehicle from user selections, if available.",
|
|
"title": "Customization Options",
|
|
},
|
|
"request_heartbeat": {"type": "boolean", "description": "Request an immediate heartbeat after function execution."},
|
|
},
|
|
"required": ["model_type", "request_heartbeat"],
|
|
"type": "object",
|
|
"additionalProperties": False,
|
|
},
|
|
}
|
|
|
|
# Attempt to convert to structured output
|
|
# This should succeed if the schema is properly formatted for OpenAI
|
|
try:
|
|
structured_output = convert_to_structured_output(schema)
|
|
|
|
# Verify the conversion succeeded and returned a valid schema
|
|
assert "name" in structured_output
|
|
assert "parameters" in structured_output
|
|
assert "strict" in structured_output
|
|
assert structured_output["strict"] is True
|
|
|
|
# Verify properties are preserved
|
|
assert "model_type" in structured_output["parameters"]["properties"]
|
|
assert "dealer_location" in structured_output["parameters"]["properties"]
|
|
assert "customization_options" in structured_output["parameters"]["properties"]
|
|
assert "request_heartbeat" in structured_output["parameters"]["properties"]
|
|
|
|
# Verify required fields
|
|
# For strict mode, ALL fields must be required (OpenAI requirement)
|
|
assert set(structured_output["parameters"]["required"]) == {
|
|
"model_type",
|
|
"dealer_location",
|
|
"customization_options",
|
|
"request_heartbeat",
|
|
}
|
|
|
|
print("✅ Complex nested anyOf schema successfully converted to structured output")
|
|
print(json.dumps(structured_output, indent=2))
|
|
|
|
except Exception as e:
|
|
pytest.fail(f"Failed to convert complex nested anyOf schema to structured output: {str(e)}")
|
|
|
|
|
|
# ========== enable_strict_mode tests ==========
|
|
|
|
|
|
def test_enable_strict_mode_adds_all_properties_to_required():
|
|
"""Test that enable_strict_mode adds all properties to required array."""
|
|
schema = {
|
|
"name": "test_tool",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"required_field": {"type": "string"},
|
|
"optional_field": {"type": "integer"},
|
|
},
|
|
"required": ["required_field"],
|
|
},
|
|
}
|
|
|
|
result = enable_strict_mode(schema, strict=True)
|
|
|
|
assert result["strict"] is True
|
|
assert set(result["parameters"]["required"]) == {"required_field", "optional_field"}
|
|
|
|
|
|
def test_enable_strict_mode_makes_optional_fields_nullable():
|
|
"""Test that optional fields are made nullable."""
|
|
schema = {
|
|
"name": "test_tool",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"required_field": {"type": "string"},
|
|
"optional_field": {"type": "integer"},
|
|
},
|
|
"required": ["required_field"],
|
|
},
|
|
}
|
|
|
|
result = enable_strict_mode(schema, strict=True)
|
|
|
|
# Required field should NOT be made nullable
|
|
assert result["parameters"]["properties"]["required_field"]["type"] == "string"
|
|
# Optional field should be made nullable
|
|
assert result["parameters"]["properties"]["optional_field"]["type"] == ["integer", "null"]
|
|
|
|
|
|
def test_enable_strict_mode_recursive_nested_objects():
|
|
"""Test recursive handling of nested objects."""
|
|
schema = {
|
|
"name": "test_tool",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"config": {
|
|
"type": "object",
|
|
"properties": {
|
|
"nested_field": {"type": "string"},
|
|
"another_nested": {"type": "integer"},
|
|
},
|
|
},
|
|
},
|
|
"required": ["config"],
|
|
},
|
|
}
|
|
|
|
result = enable_strict_mode(schema, strict=True)
|
|
|
|
nested = result["parameters"]["properties"]["config"]
|
|
assert nested["additionalProperties"] is False
|
|
assert set(nested["required"]) == {"nested_field", "another_nested"}
|
|
|
|
|
|
def test_enable_strict_mode_recursive_arrays():
|
|
"""Test recursive handling of arrays with object items."""
|
|
schema = {
|
|
"name": "test_tool",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"items": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"item_field": {"type": "string"},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
"required": ["items"],
|
|
},
|
|
}
|
|
|
|
result = enable_strict_mode(schema, strict=True)
|
|
|
|
array_items = result["parameters"]["properties"]["items"]["items"]
|
|
assert array_items["additionalProperties"] is False
|
|
assert array_items["required"] == ["item_field"]
|
|
|
|
|
|
def test_enable_strict_mode_strict_false_no_modification():
|
|
"""Test that strict=False doesn't modify schema structure."""
|
|
schema = {
|
|
"name": "test_tool",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"field": {"type": "string"},
|
|
},
|
|
"required": [],
|
|
},
|
|
}
|
|
|
|
result = enable_strict_mode(schema, strict=False)
|
|
|
|
assert "strict" not in result
|
|
assert result["parameters"]["required"] == []
|
|
# Verify the field type is unchanged
|
|
assert result["parameters"]["properties"]["field"]["type"] == "string"
|
|
|
|
|
|
def test_enable_strict_mode_non_strict_only_tool():
|
|
"""Test that NON_STRICT_ONLY tools are not modified."""
|
|
schema = {
|
|
"name": "test_tool",
|
|
MCP_TOOL_METADATA_SCHEMA_STATUS: "NON_STRICT_ONLY",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"field": {"type": "string"},
|
|
},
|
|
"required": [],
|
|
},
|
|
}
|
|
|
|
result = enable_strict_mode(schema, strict=True)
|
|
|
|
# Strict mode should not be applied
|
|
assert "strict" not in result
|
|
# Metadata should be removed
|
|
assert MCP_TOOL_METADATA_SCHEMA_STATUS not in result
|
|
# Required should be unchanged
|
|
assert result["parameters"]["required"] == []
|
|
|
|
|
|
def test_enable_strict_mode_preserves_existing_required():
|
|
"""Test that fields already in required are not made nullable."""
|
|
schema = {
|
|
"name": "test_tool",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"already_required": {"type": "string"},
|
|
"optional_field": {"type": "integer"},
|
|
},
|
|
"required": ["already_required"],
|
|
},
|
|
}
|
|
|
|
result = enable_strict_mode(schema, strict=True)
|
|
|
|
# already_required should NOT be made nullable (it was already required)
|
|
assert result["parameters"]["properties"]["already_required"]["type"] == "string"
|
|
# optional_field should be made nullable
|
|
assert result["parameters"]["properties"]["optional_field"]["type"] == ["integer", "null"]
|
|
# Both should now be in required
|
|
assert set(result["parameters"]["required"]) == {"already_required", "optional_field"}
|
|
|
|
|
|
def test_enable_strict_mode_handles_anyof():
|
|
"""Test that anyOf structures are recursively processed."""
|
|
schema = {
|
|
"name": "test_tool",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"config": {
|
|
"anyOf": [
|
|
{
|
|
"type": "object",
|
|
"properties": {
|
|
"nested_field": {"type": "string"},
|
|
},
|
|
},
|
|
{"type": "null"},
|
|
],
|
|
},
|
|
},
|
|
"required": ["config"],
|
|
},
|
|
}
|
|
|
|
result = enable_strict_mode(schema, strict=True)
|
|
|
|
# The object inside anyOf should have additionalProperties and required set
|
|
anyof_options = result["parameters"]["properties"]["config"]["anyOf"]
|
|
object_option = next(opt for opt in anyof_options if opt.get("type") == "object")
|
|
assert object_option["additionalProperties"] is False
|
|
assert object_option["required"] == ["nested_field"]
|
|
|
|
|
|
def test_enable_strict_mode_handles_type_array_nullable():
|
|
"""Test that fields with type array (already nullable) are handled correctly."""
|
|
schema = {
|
|
"name": "test_tool",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"already_nullable": {"type": ["string", "null"]},
|
|
"not_nullable": {"type": "integer"},
|
|
},
|
|
"required": [],
|
|
},
|
|
}
|
|
|
|
result = enable_strict_mode(schema, strict=True)
|
|
|
|
# Already nullable field should not get duplicate null
|
|
already_nullable_type = result["parameters"]["properties"]["already_nullable"]["type"]
|
|
assert already_nullable_type.count("null") == 1
|
|
# Not nullable should become nullable
|
|
assert result["parameters"]["properties"]["not_nullable"]["type"] == ["integer", "null"]
|
|
|
|
|
|
def test_enable_strict_mode_does_not_mutate_original():
|
|
"""Test that the original schema is not mutated."""
|
|
schema = {
|
|
"name": "test_tool",
|
|
"parameters": {
|
|
"type": "object",
|
|
"properties": {
|
|
"field": {"type": "string"},
|
|
},
|
|
"required": [],
|
|
},
|
|
}
|
|
|
|
original_required = schema["parameters"]["required"].copy()
|
|
original_field_type = schema["parameters"]["properties"]["field"]["type"]
|
|
|
|
result = enable_strict_mode(schema, strict=True)
|
|
|
|
# Original should be unchanged
|
|
assert schema["parameters"]["required"] == original_required
|
|
assert schema["parameters"]["properties"]["field"]["type"] == original_field_type
|
|
assert "strict" not in schema
|
|
# Result should be different
|
|
assert result["strict"] is True
|
|
assert len(result["parameters"]["required"]) == 1
|