This commit is contained in:
trongtrandp 2026-03-24 13:52:02 +08:00 committed by GitHub
commit cf3c4b8e61
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 2078 additions and 67 deletions

View File

@ -328,10 +328,10 @@ class BaseAgentRunner(AppRunner):
self,
agent_thought_id: str,
tool_name: str | None,
tool_input: Union[str, dict, None],
tool_input: Union[str, dict, list, None],
thought: str | None,
observation: Union[str, dict, None],
tool_invoke_meta: Union[str, dict, None],
observation: Union[str, dict, list, None],
tool_invoke_meta: Union[str, dict, list, None],
answer: str | None,
messages_ids: list[str],
llm_usage: LLMUsage | None = None,
@ -352,7 +352,7 @@ class BaseAgentRunner(AppRunner):
agent_thought.tool = tool_name
if tool_input:
if isinstance(tool_input, dict):
if isinstance(tool_input, (dict, list)):
try:
tool_input = json.dumps(tool_input, ensure_ascii=False)
except Exception:
@ -361,7 +361,7 @@ class BaseAgentRunner(AppRunner):
agent_thought.tool_input = tool_input
if observation:
if isinstance(observation, dict):
if isinstance(observation, (dict, list)):
try:
observation = json.dumps(observation, ensure_ascii=False)
except Exception:
@ -401,7 +401,7 @@ class BaseAgentRunner(AppRunner):
agent_thought.tool_labels_str = json.dumps(labels)
if tool_invoke_meta is not None:
if isinstance(tool_invoke_meta, dict):
if isinstance(tool_invoke_meta, (dict, list)):
try:
tool_invoke_meta = json.dumps(tool_invoke_meta, ensure_ascii=False)
except Exception:
@ -450,43 +450,100 @@ class BaseAgentRunner(AppRunner):
tool_calls: list[AssistantPromptMessage.ToolCall] = []
tool_call_response: list[ToolPromptMessage] = []
tool_input_payload = agent_thought.tool_input
tool_inputs_parsed = None
if tool_input_payload:
try:
tool_inputs = json.loads(tool_input_payload)
tool_inputs_parsed = json.loads(tool_input_payload)
except Exception:
tool_inputs = {tool: {} for tool in tool_names}
else:
tool_inputs = {tool: {} for tool in tool_names}
pass
observation_payload = agent_thought.observation
tool_responses_parsed = None
if observation_payload:
try:
tool_responses = json.loads(observation_payload)
tool_responses_parsed = json.loads(observation_payload)
except Exception:
tool_responses = dict.fromkeys(tool_names, observation_payload)
else:
tool_responses = dict.fromkeys(tool_names, observation_payload)
pass
for tool in tool_names:
# generate a uuid for tool call
tool_call_id = str(uuid.uuid4())
tool_calls.append(
AssistantPromptMessage.ToolCall(
id=tool_call_id,
type="function",
function=AssistantPromptMessage.ToolCall.ToolCallFunction(
if isinstance(tool_inputs_parsed, list):
# New array format - iterate by index
for idx, item in enumerate(tool_inputs_parsed):
tool_call_id = str(uuid.uuid4())
if isinstance(item, dict) and "name" in item:
tool_name = item["name"]
tool_args = item.get("arguments", {})
else:
tool_name = tool_names[idx] if idx < len(tool_names) else f"tool_{idx}"
tool_args = item if isinstance(item, dict) else {}
tool_calls.append(
AssistantPromptMessage.ToolCall(
id=tool_call_id,
type="function",
function=AssistantPromptMessage.ToolCall.ToolCallFunction(
name=tool_name,
arguments=json.dumps(tool_args),
),
)
)
# Get corresponding response
tool_resp_content = ""
if isinstance(tool_responses_parsed, list):
if idx < len(tool_responses_parsed):
resp_item = tool_responses_parsed[idx]
resp_content = (
resp_item["output"]
if isinstance(resp_item, dict) and "output" in resp_item
else resp_item
)
tool_resp_content = (
json.dumps(resp_content)
if isinstance(resp_content, (dict, list))
else str(resp_content)
)
elif observation_payload:
tool_resp_content = observation_payload
tool_call_response.append(
ToolPromptMessage(
content=tool_resp_content,
name=tool_name,
tool_call_id=tool_call_id,
)
)
else:
# Old dict format - existing logic
tool_inputs = (
tool_inputs_parsed
if isinstance(tool_inputs_parsed, dict)
else {t: {} for t in tool_names}
)
tool_responses = (
tool_responses_parsed
if isinstance(tool_responses_parsed, dict)
else dict.fromkeys(tool_names, observation_payload)
)
for tool in tool_names:
tool_call_id = str(uuid.uuid4())
tool_calls.append(
AssistantPromptMessage.ToolCall(
id=tool_call_id,
type="function",
function=AssistantPromptMessage.ToolCall.ToolCallFunction(
name=tool,
arguments=json.dumps(tool_inputs.get(tool, {})),
),
)
)
tool_call_response.append(
ToolPromptMessage(
content=tool_responses.get(tool, agent_thought.observation),
name=tool,
arguments=json.dumps(tool_inputs.get(tool, {})),
),
tool_call_id=tool_call_id,
)
)
)
tool_call_response.append(
ToolPromptMessage(
content=tool_responses.get(tool, agent_thought.observation),
name=tool,
tool_call_id=tool_call_id,
)
)
result.extend(
[

View File

@ -126,11 +126,13 @@ class FunctionCallAgentRunner(BaseAgentRunner):
tool_call_names = ";".join([tool_call[1] for tool_call in tool_calls])
try:
tool_call_inputs = json.dumps(
{tool_call[1]: tool_call[2] for tool_call in tool_calls}, ensure_ascii=False
[{"name": tool_call[1], "arguments": tool_call[2]} for tool_call in tool_calls],
ensure_ascii=False,
)
except TypeError:
# fallback: force ASCII to handle non-serializable objects
tool_call_inputs = json.dumps({tool_call[1]: tool_call[2] for tool_call in tool_calls})
tool_call_inputs = json.dumps(
[{"name": tool_call[1], "arguments": tool_call[2]} for tool_call in tool_calls]
)
if chunk.delta.message and chunk.delta.message.content:
if isinstance(chunk.delta.message.content, list):
@ -153,11 +155,13 @@ class FunctionCallAgentRunner(BaseAgentRunner):
tool_call_names = ";".join([tool_call[1] for tool_call in tool_calls])
try:
tool_call_inputs = json.dumps(
{tool_call[1]: tool_call[2] for tool_call in tool_calls}, ensure_ascii=False
[{"name": tool_call[1], "arguments": tool_call[2]} for tool_call in tool_calls],
ensure_ascii=False,
)
except TypeError:
# fallback: force ASCII to handle non-serializable objects
tool_call_inputs = json.dumps({tool_call[1]: tool_call[2] for tool_call in tool_calls})
tool_call_inputs = json.dumps(
[{"name": tool_call[1], "arguments": tool_call[2]} for tool_call in tool_calls]
)
if result.usage:
increase_usage(llm_usage, result.usage)
@ -284,13 +288,14 @@ class FunctionCallAgentRunner(BaseAgentRunner):
tool_name="",
tool_input="",
thought="",
tool_invoke_meta={
tool_response["tool_call_name"]: tool_response["meta"] for tool_response in tool_responses
},
observation={
tool_response["tool_call_name"]: tool_response["tool_response"]
tool_invoke_meta=[
{"name": tool_response["tool_call_name"], "meta": tool_response["meta"]}
for tool_response in tool_responses
},
],
observation=[
{"name": tool_response["tool_call_name"], "output": tool_response["tool_response"]}
for tool_response in tool_responses
],
answer="",
messages_ids=message_file_ids,
)

View File

@ -2292,6 +2292,37 @@ class MessageAgentThought(TypeBase):
else:
return []
@staticmethod
def parse_array_with_ordinal_keys(
items: list, tools: list[str], value_key: str, default_value: Any = None
) -> dict[str, Any]:
"""Parse array format items into dict with ordinal keys for duplicate names.
Args:
items: List of dicts with "name" and value_key fields.
tools: Fallback tool names from self.tools.
value_key: Key to extract value from each item (e.g., "arguments", "output", "meta").
default_value: Default when value_key is missing from a named item. Defaults to {}.
Returns:
Dict with ordinal keys: {"search": data, "search__2": data, ...}
"""
if default_value is None:
default_value = {}
result: dict[str, Any] = {}
name_count: dict[str, int] = {}
for i, item in enumerate(items):
if isinstance(item, dict) and "name" in item:
name = item["name"]
value = item.get(value_key, default_value)
else:
name = tools[i] if i < len(tools) else f"tool_{i}"
value = item if isinstance(item, dict) else (item if default_value != {} else {})
name_count[name] = name_count.get(name, 0) + 1
key = name if name_count[name] == 1 else f"{name}__{name_count[name]}"
result[key] = value
return result
@property
def tools(self) -> list[str]:
return self.tool.split(";") if self.tool else []
@ -2310,7 +2341,15 @@ class MessageAgentThought(TypeBase):
def tool_meta(self) -> dict[str, Any]:
try:
if self.tool_meta_str:
return cast(dict[str, Any], json.loads(self.tool_meta_str))
raw = json.loads(self.tool_meta_str)
tools = self.tools
# New array format: [{"name": "search", "meta": {...}}, ...]
if isinstance(raw, list):
return self.parse_array_with_ordinal_keys(raw, tools, "meta")
# Old dict format
if isinstance(raw, dict):
return cast(dict[str, Any], raw)
return {}
else:
return {}
except Exception:
@ -2322,16 +2361,24 @@ class MessageAgentThought(TypeBase):
try:
if self.tool_input:
data = json.loads(self.tool_input)
result: dict[str, Any] = {}
for tool in tools:
if tool in data:
result[tool] = data[tool]
else:
if len(tools) == 1:
result[tool] = data
# New array format: [{"name": "search", "arguments": {...}}, ...]
if isinstance(data, list):
return self.parse_array_with_ordinal_keys(data, tools, "arguments")
# Old dict format: {"tool_name": {...}, ...}
if isinstance(data, dict):
result = {}
for tool in tools:
if tool in data:
result[tool] = data[tool]
else:
result[tool] = {}
return result
if len(tools) == 1:
result[tool] = data
else:
result[tool] = {}
return result
if len(tools) == 1:
return {tools[0]: data}
return {}
else:
return {tool: {} for tool in tools}
except Exception:
@ -2343,16 +2390,24 @@ class MessageAgentThought(TypeBase):
try:
if self.observation:
data = json.loads(self.observation)
result: dict[str, Any] = {}
for tool in tools:
if tool in data:
result[tool] = data[tool]
else:
if len(tools) == 1:
result[tool] = data
# New array format: [{"name": "search", "output": "result"}, ...]
if isinstance(data, list):
return self.parse_array_with_ordinal_keys(data, tools, "output", default_value="")
# Old dict format
if isinstance(data, dict):
result = {}
for tool in tools:
if tool in data:
result[tool] = data[tool]
else:
result[tool] = {}
return result
if len(tools) == 1:
result[tool] = data
else:
result[tool] = {}
return result
if len(tools) == 1:
return {tools[0]: data}
return {}
else:
return {tool: {} for tool in tools}
except Exception:

View File

@ -101,12 +101,17 @@ class AgentService:
tool_inputs = agent_thought.tool_inputs_dict
tool_outputs = agent_thought.tool_outputs_dict or {}
tool_calls = []
for tool in tools:
# Generate ordinal keys using the shared helper for consistency
ordinal_keys = list(MessageAgentThought.parse_array_with_ordinal_keys(
[{"name": t, "arguments": {}} for t in tools], tools, "arguments"
).keys()) if tools else []
for i, tool in enumerate(tools):
tool_name = tool
ordinal_key = ordinal_keys[i] if i < len(ordinal_keys) else tool_name
tool_label = tool_labels.get(tool_name, tool_name)
tool_input = tool_inputs.get(tool_name, {})
tool_output = tool_outputs.get(tool_name, {})
tool_meta_data = tool_meta.get(tool_name, {})
tool_input = tool_inputs.get(ordinal_key, {})
tool_output = tool_outputs.get(ordinal_key, {})
tool_meta_data = tool_meta.get(ordinal_key, {})
tool_config = tool_meta_data.get("tool_config", {})
if tool_config.get("tool_provider_type", "") != "dataset-retrieval":
tool_icon = ToolManager.get_tool_icon(

View File

@ -0,0 +1,539 @@
"""
Data flow integrity tests for the duplicate tool name fix.
These tests verify that the ordinal key algorithm is IDENTICAL across:
- model.py (tool_inputs_dict, tool_outputs_dict, tool_meta)
- agent_service.py (get_agent_logs)
They also verify mixed-format scenarios and history reconstruction paths.
"""
import json
import pytest
from models.model import MessageAgentThought
def _make_thought(**kwargs) -> MessageAgentThought:
"""Create a MessageAgentThought with required defaults."""
defaults = {
"message_id": "msg-1",
"position": 1,
"created_by_role": "account",
"created_by": "user-1",
}
defaults.update(kwargs)
return MessageAgentThought(**defaults)
def _ordinal_keys_from_agent_service(tools: list[str]) -> list[str]:
"""
Reproduce the ordinal key algorithm from agent_service.py get_agent_logs.
This MUST match the algorithm in model.py properties.
"""
keys = []
name_count: dict[str, int] = {}
for tool in tools:
tool_name = tool
name_count[tool_name] = name_count.get(tool_name, 0) + 1
ordinal_key = tool_name if name_count[tool_name] == 1 else f"{tool_name}__{name_count[tool_name]}"
keys.append(ordinal_key)
return keys
def _ordinal_keys_from_model(tools: list[str], data: list[dict]) -> list[str]:
"""
Reproduce the ordinal key algorithm from model.py tool_inputs_dict.
"""
keys = []
name_count: dict[str, int] = {}
for i, item in enumerate(data):
if isinstance(item, dict) and "name" in item:
name = item["name"]
else:
name = tools[i] if i < len(tools) else f"tool_{i}"
name_count[name] = name_count.get(name, 0) + 1
key = name if name_count[name] == 1 else f"{name}__{name_count[name]}"
keys.append(key)
return keys
class TestOrdinalKeyConsistency:
"""CRITICAL: ordinal keys MUST be identical between model.py and agent_service.py."""
def test_no_duplicates(self):
"""Unique tool names produce identical ordinal keys."""
tools = ["search", "calculator", "weather"]
data = [{"name": "search"}, {"name": "calculator"}, {"name": "weather"}]
service_keys = _ordinal_keys_from_agent_service(tools)
model_keys = _ordinal_keys_from_model(tools, data)
assert service_keys == model_keys
assert service_keys == ["search", "calculator", "weather"]
def test_two_duplicates(self):
"""Two identical tool names produce same ordinal keys in both paths."""
tools = ["search", "search"]
data = [{"name": "search"}, {"name": "search"}]
service_keys = _ordinal_keys_from_agent_service(tools)
model_keys = _ordinal_keys_from_model(tools, data)
assert service_keys == model_keys
assert service_keys == ["search", "search__2"]
def test_three_duplicates(self):
"""Three identical tool names."""
tools = ["search", "search", "search"]
data = [{"name": "search"}, {"name": "search"}, {"name": "search"}]
service_keys = _ordinal_keys_from_agent_service(tools)
model_keys = _ordinal_keys_from_model(tools, data)
assert service_keys == model_keys
assert service_keys == ["search", "search__2", "search__3"]
def test_mixed_duplicates_and_unique(self):
"""Mix of duplicate and unique tools."""
tools = ["search", "calculator", "search", "search"]
data = [
{"name": "search"},
{"name": "calculator"},
{"name": "search"},
{"name": "search"},
]
service_keys = _ordinal_keys_from_agent_service(tools)
model_keys = _ordinal_keys_from_model(tools, data)
assert service_keys == model_keys
assert service_keys == ["search", "calculator", "search__2", "search__3"]
def test_multiple_different_duplicates(self):
"""Multiple tools each duplicated."""
tools = ["search", "calculator", "search", "calculator"]
data = [
{"name": "search"},
{"name": "calculator"},
{"name": "search"},
{"name": "calculator"},
]
service_keys = _ordinal_keys_from_agent_service(tools)
model_keys = _ordinal_keys_from_model(tools, data)
assert service_keys == model_keys
assert service_keys == ["search", "calculator", "search__2", "calculator__2"]
class TestWriteReadRoundTrip:
"""Verify data written by fc_agent_runner.py can be read back correctly."""
def test_tool_input_array_roundtrip(self):
"""Data written as array by fc_agent_runner is correctly parsed by model.py."""
# fc_agent_runner.py writes tool_call_inputs as:
# json.dumps([{"name": tool_call[1], "arguments": tool_call[2]} for tool_call in tool_calls])
written = json.dumps([
{"name": "search", "arguments": {"q": "python"}},
{"name": "search", "arguments": {"q": "javascript"}},
{"name": "calculator", "arguments": {"expr": "2+2"}},
])
thought = _make_thought(
tool="search;search;calculator",
tool_input=written,
)
result = thought.tool_inputs_dict
assert result["search"] == {"q": "python"}
assert result["search__2"] == {"q": "javascript"}
assert result["calculator"] == {"expr": "2+2"}
def test_observation_array_roundtrip(self):
"""Data written as array for observation is correctly parsed."""
# fc_agent_runner.py writes observation as:
# [{"name": tool_response["tool_call_name"], "output": tool_response["tool_response"]}]
written = json.dumps([
{"name": "search", "output": "python results"},
{"name": "search", "output": "javascript results"},
{"name": "calculator", "output": "4"},
])
thought = _make_thought(
tool="search;search;calculator",
observation=written,
)
result = thought.tool_outputs_dict
assert result["search"] == "python results"
assert result["search__2"] == "javascript results"
assert result["calculator"] == "4"
def test_meta_array_roundtrip(self):
"""Data written as array for tool_meta is correctly parsed."""
# fc_agent_runner.py writes tool_invoke_meta as:
# [{"name": tool_response["tool_call_name"], "meta": tool_response["meta"]}]
written = json.dumps([
{"name": "search", "meta": {"time_cost": 1.5, "tool_config": {"tool_provider_type": "api"}}},
{"name": "search", "meta": {"time_cost": 2.0, "tool_config": {"tool_provider_type": "api"}}},
{"name": "calculator", "meta": {"time_cost": 0.1, "tool_config": {"tool_provider_type": "builtin"}}},
])
thought = _make_thought(
tool="search;search;calculator",
tool_meta_str=written,
)
result = thought.tool_meta
assert result["search"]["time_cost"] == 1.5
assert result["search__2"]["time_cost"] == 2.0
assert result["calculator"]["time_cost"] == 0.1
def test_all_properties_have_consistent_ordinal_keys(self):
"""All three properties (inputs, outputs, meta) must produce the SAME ordinal keys."""
tool_str = "search;search;calculator;search"
input_data = json.dumps([
{"name": "search", "arguments": {"q": "a"}},
{"name": "search", "arguments": {"q": "b"}},
{"name": "calculator", "arguments": {"expr": "1+1"}},
{"name": "search", "arguments": {"q": "c"}},
])
output_data = json.dumps([
{"name": "search", "output": "result_a"},
{"name": "search", "output": "result_b"},
{"name": "calculator", "output": "2"},
{"name": "search", "output": "result_c"},
])
meta_data = json.dumps([
{"name": "search", "meta": {"time_cost": 1.0}},
{"name": "search", "meta": {"time_cost": 2.0}},
{"name": "calculator", "meta": {"time_cost": 0.5}},
{"name": "search", "meta": {"time_cost": 3.0}},
])
thought = _make_thought(
tool=tool_str,
tool_input=input_data,
observation=output_data,
tool_meta_str=meta_data,
)
input_keys = set(thought.tool_inputs_dict.keys())
output_keys = set(thought.tool_outputs_dict.keys())
meta_keys = set(thought.tool_meta.keys())
assert input_keys == output_keys == meta_keys
assert input_keys == {"search", "search__2", "calculator", "search__3"}
class TestMixedFormatScenarios:
"""Test scenarios where formats might be mixed (e.g., crash during partial save)."""
def test_array_input_with_dict_observation(self):
"""New array format input but old dict format observation."""
thought = _make_thought(
tool="search;calculator",
tool_input=json.dumps([
{"name": "search", "arguments": {"q": "test"}},
{"name": "calculator", "arguments": {"expr": "1+1"}},
]),
observation=json.dumps({"search": "found", "calculator": "2"}),
)
inputs = thought.tool_inputs_dict
outputs = thought.tool_outputs_dict
# Inputs use ordinal keys from array format
assert "search" in inputs
assert "calculator" in inputs
# Outputs use old dict format (keyed by tool name directly)
assert "search" in outputs
assert "calculator" in outputs
def test_dict_input_with_array_observation(self):
"""Old dict format input but new array format observation."""
thought = _make_thought(
tool="search;calculator",
tool_input=json.dumps({"search": {"q": "test"}, "calculator": {"expr": "1+1"}}),
observation=json.dumps([
{"name": "search", "output": "found"},
{"name": "calculator", "output": "2"},
]),
)
inputs = thought.tool_inputs_dict
outputs = thought.tool_outputs_dict
assert inputs["search"] == {"q": "test"}
assert outputs["search"] == "found"
def test_none_meta_with_array_input(self):
"""tool_meta_str is None/empty but tool_input is new array format."""
thought = _make_thought(
tool="search;search",
tool_input=json.dumps([
{"name": "search", "arguments": {"q": "a"}},
{"name": "search", "arguments": {"q": "b"}},
]),
tool_meta_str="",
)
inputs = thought.tool_inputs_dict
meta = thought.tool_meta
assert inputs["search"] == {"q": "a"}
assert inputs["search__2"] == {"q": "b"}
assert meta == {} # Empty meta is fine
def test_none_observation_with_array_input(self):
"""observation is None/empty but tool_input is new array format."""
thought = _make_thought(
tool="search;search",
tool_input=json.dumps([
{"name": "search", "arguments": {"q": "a"}},
{"name": "search", "arguments": {"q": "b"}},
]),
observation="",
)
outputs = thought.tool_outputs_dict
assert outputs == {"search": {}, "search": {}} # noqa: this is how empty dict format works
def test_array_input_with_plain_string_observation(self):
"""Array input but observation is a plain non-JSON string."""
thought = _make_thought(
tool="search",
tool_input=json.dumps([{"name": "search", "arguments": {"q": "test"}}]),
observation="plain text response",
)
outputs = thought.tool_outputs_dict
# Plain string falls through to the except branch
assert outputs == {"search": "plain text response"}
class TestAgentServiceOrdinalKeyAlignment:
"""
Verify agent_service.py ordinal key lookup matches model.py property keys.
This simulates the agent_service.py get_agent_logs iteration pattern.
"""
def test_service_reads_all_duplicate_inputs(self):
"""agent_service.py can read each duplicate tool's input via ordinal keys."""
thought = _make_thought(
tool="search;search;calculator",
tool_input=json.dumps([
{"name": "search", "arguments": {"q": "python"}},
{"name": "search", "arguments": {"q": "javascript"}},
{"name": "calculator", "arguments": {"expr": "2+2"}},
]),
observation=json.dumps([
{"name": "search", "output": "python results"},
{"name": "search", "output": "js results"},
{"name": "calculator", "output": "4"},
]),
tool_meta_str=json.dumps([
{"name": "search", "meta": {"time_cost": 1.0, "tool_config": {"tool_provider_type": "api"}}},
{"name": "search", "meta": {"time_cost": 1.5, "tool_config": {"tool_provider_type": "api"}}},
{"name": "calculator", "meta": {"time_cost": 0.2, "tool_config": {"tool_provider_type": "builtin"}}},
]),
)
# Simulate agent_service.py iteration
tools = thought.tools
tool_inputs = thought.tool_inputs_dict
tool_outputs = thought.tool_outputs_dict
tool_meta = thought.tool_meta
name_count: dict[str, int] = {}
results = []
for tool in tools:
tool_name = tool
name_count[tool_name] = name_count.get(tool_name, 0) + 1
ordinal_key = tool_name if name_count[tool_name] == 1 else f"{tool_name}__{name_count[tool_name]}"
tool_input = tool_inputs.get(ordinal_key, {})
tool_output = tool_outputs.get(ordinal_key, {})
tool_meta_data = tool_meta.get(ordinal_key, {})
results.append({
"name": tool_name,
"ordinal_key": ordinal_key,
"input": tool_input,
"output": tool_output,
"meta": tool_meta_data,
})
assert len(results) == 3
assert results[0]["name"] == "search"
assert results[0]["ordinal_key"] == "search"
assert results[0]["input"] == {"q": "python"}
assert results[0]["output"] == "python results"
assert results[0]["meta"]["time_cost"] == 1.0
assert results[1]["name"] == "search"
assert results[1]["ordinal_key"] == "search__2"
assert results[1]["input"] == {"q": "javascript"}
assert results[1]["output"] == "js results"
assert results[1]["meta"]["time_cost"] == 1.5
assert results[2]["name"] == "calculator"
assert results[2]["ordinal_key"] == "calculator"
assert results[2]["input"] == {"expr": "2+2"}
assert results[2]["output"] == "4"
assert results[2]["meta"]["time_cost"] == 0.2
class TestOpsTraceManagerIssue:
"""
Test for a known issue: ops_trace_manager.py tool_trace uses
agent_thought.tool_meta.get(tool_name, {}) with the RAW tool_name,
NOT the ordinal key. This means for duplicate tools, it always
gets the FIRST occurrence's meta data.
This is a PRE-EXISTING issue that was not introduced by the fix.
"""
def test_ops_trace_uses_raw_name_not_ordinal(self):
"""Demonstrate that tool_meta.get(tool_name) only gets first occurrence."""
thought = _make_thought(
tool="search;search",
tool_meta_str=json.dumps([
{"name": "search", "meta": {"time_cost": 1.0}},
{"name": "search", "meta": {"time_cost": 2.0}},
]),
)
meta = thought.tool_meta
# tool_meta keys are "search" and "search__2"
assert "search" in meta
assert "search__2" in meta
# ops_trace_manager.py line 823 does:
# tool_meta_data = agent_thought.tool_meta.get(tool_name, {})
# where tool_name is the raw name "search" - this gets FIRST occurrence only
tool_name = "search"
tool_meta_data = meta.get(tool_name, {})
assert tool_meta_data == {"time_cost": 1.0} # Always first
# The second occurrence (search__2) is NOT accessible via raw name lookup
assert meta.get("search__2", {}) == {"time_cost": 2.0}
class TestHistoryReconstructionPath:
"""Test the organize_agent_history path in base_agent_runner.py."""
def test_array_format_history_reconstruction_data(self):
"""
Verify the data structures that organize_agent_history would parse.
The method checks isinstance(tool_inputs_parsed, list) to branch.
"""
# Simulate what's stored in DB
tool_input_payload = json.dumps([
{"name": "search", "arguments": {"q": "python"}},
{"name": "search", "arguments": {"q": "javascript"}},
])
observation_payload = json.dumps([
{"name": "search", "output": "python results"},
{"name": "search", "output": "js results"},
])
tool_inputs_parsed = json.loads(tool_input_payload)
tool_responses_parsed = json.loads(observation_payload)
assert isinstance(tool_inputs_parsed, list)
# Verify each item can be properly extracted
for idx, item in enumerate(tool_inputs_parsed):
assert isinstance(item, dict)
assert "name" in item
assert "arguments" in item
# Verify responses match by index
for idx, resp_item in enumerate(tool_responses_parsed):
assert isinstance(resp_item, dict)
assert "name" in resp_item
assert "output" in resp_item
# Verify index-based pairing is correct
assert tool_inputs_parsed[0]["name"] == tool_responses_parsed[0]["name"]
assert tool_inputs_parsed[1]["name"] == tool_responses_parsed[1]["name"]
def test_old_dict_format_history_reconstruction_data(self):
"""Verify old dict format is still handled correctly in history path."""
tool_input_payload = json.dumps({"search": {"q": "test"}, "calculator": {"expr": "1+1"}})
observation_payload = json.dumps({"search": "found", "calculator": "2"})
tool_inputs_parsed = json.loads(tool_input_payload)
tool_responses_parsed = json.loads(observation_payload)
# Old format is dict, not list
assert isinstance(tool_inputs_parsed, dict)
assert isinstance(tool_responses_parsed, dict)
# Dict format uses tool name as key
tool_names = ["search", "calculator"]
for tool in tool_names:
assert tool in tool_inputs_parsed
assert tool in tool_responses_parsed
def test_array_input_with_list_observation_pairing(self):
"""Verify array format pairs input[i] with observation[i] by index."""
inputs = [
{"name": "search", "arguments": {"q": "a"}},
{"name": "search", "arguments": {"q": "b"}},
{"name": "calculator", "arguments": {"expr": "1+1"}},
]
observations = [
{"name": "search", "output": "result_a"},
{"name": "search", "output": "result_b"},
{"name": "calculator", "output": "2"},
]
# Verify index-based pairing
for idx in range(len(inputs)):
assert inputs[idx]["name"] == observations[idx]["name"]
class TestSSEPipelineFormat:
"""
Verify SSE pipeline sends tool_input as raw JSON string.
The frontend must parse this JSON string.
"""
def test_sse_sends_raw_tool_input_string(self):
"""
easy_ui_based_generate_task_pipeline.py line 579 sends:
tool_input=agent_thought.tool_input
This is the RAW JSON string, not the parsed dict.
The frontend receives either:
- Old format: '{"search": {"q": "test"}}'
- New format: '[{"name": "search", "arguments": {"q": "test"}}]'
Both are valid JSON strings.
"""
# New array format
array_json = json.dumps([
{"name": "search", "arguments": {"q": "test"}},
{"name": "search", "arguments": {"q": "test2"}},
])
thought = _make_thought(
tool="search;search",
tool_input=array_json,
)
# SSE sends tool_input directly - it's a string
assert isinstance(thought.tool_input, str)
# Frontend must be able to parse it
parsed = json.loads(thought.tool_input)
assert isinstance(parsed, list)
assert len(parsed) == 2
def test_sse_sends_raw_observation_string(self):
"""SSE sends observation directly as string."""
array_json = json.dumps([
{"name": "search", "output": "result1"},
{"name": "search", "output": "result2"},
])
thought = _make_thought(
tool="search;search",
observation=array_json,
)
assert isinstance(thought.observation, str)
parsed = json.loads(thought.observation)
assert isinstance(parsed, list)

View File

@ -0,0 +1,698 @@
"""Edge case tests for duplicate tool name fix in MessageAgentThought.
Focuses on boundary conditions, malformed inputs, and tricky scenarios
that the basic test suite does not cover.
"""
import json
import pytest
from models.model import MessageAgentThought
def _make_thought(**kwargs) -> MessageAgentThought:
"""Create a MessageAgentThought with required defaults."""
defaults = {
"message_id": "msg-1",
"position": 1,
"created_by_role": "account",
"created_by": "user-1",
}
defaults.update(kwargs)
return MessageAgentThought(**defaults)
# ===================================================================
# tool_inputs_dict edge cases
# ===================================================================
class TestToolInputsDictEdgeCases:
"""Edge cases for tool_inputs_dict property."""
def test_array_longer_than_tools_list(self):
"""Array has more items than the semicolon-separated tool list.
Extra items should use fallback name 'tool_N'.
"""
thought = _make_thought(
tool="search",
tool_input=json.dumps([
{"name": "search", "arguments": {"q": "a"}},
{"name": "extra_tool", "arguments": {"q": "b"}},
]),
)
result = thought.tool_inputs_dict
assert result["search"] == {"q": "a"}
assert result["extra_tool"] == {"q": "b"}
def test_array_shorter_than_tools_list(self):
"""Array has fewer items than tools. Missing tools get no entry."""
thought = _make_thought(
tool="search;calculator;weather",
tool_input=json.dumps([
{"name": "search", "arguments": {"q": "test"}},
]),
)
result = thought.tool_inputs_dict
assert result == {"search": {"q": "test"}}
def test_interleaved_duplicates(self):
"""Pattern: A, B, A, B - tests that ordinal tracking is per-name."""
thought = _make_thought(
tool="search;calc;search;calc",
tool_input=json.dumps([
{"name": "search", "arguments": {"q": "first"}},
{"name": "calc", "arguments": {"expr": "1+1"}},
{"name": "search", "arguments": {"q": "second"}},
{"name": "calc", "arguments": {"expr": "2+2"}},
]),
)
result = thought.tool_inputs_dict
assert result["search"] == {"q": "first"}
assert result["search__2"] == {"q": "second"}
assert result["calc"] == {"expr": "1+1"}
assert result["calc__2"] == {"expr": "2+2"}
def test_many_duplicates_ordinal_keys(self):
"""10 identical tool names should produce search, search__2, ... search__10."""
items = [{"name": "search", "arguments": {"q": f"query_{i}"}} for i in range(10)]
thought = _make_thought(
tool=";".join(["search"] * 10),
tool_input=json.dumps(items),
)
result = thought.tool_inputs_dict
assert result["search"] == {"q": "query_0"}
for i in range(1, 10):
assert result[f"search__{i + 1}"] == {"q": f"query_{i}"}
assert len(result) == 10
def test_tool_name_with_double_underscore_collision(self):
"""Tool name 'search__2' already exists; duplicate 'search' should
produce key 'search__2' which collides. Verify the behavior.
"""
thought = _make_thought(
tool="search__2;search;search",
tool_input=json.dumps([
{"name": "search__2", "arguments": {"q": "explicit"}},
{"name": "search", "arguments": {"q": "first"}},
{"name": "search", "arguments": {"q": "second"}},
]),
)
result = thought.tool_inputs_dict
# The first entry for "search__2" is stored first.
# Then "search" (1st occurrence) gets key "search".
# Then "search" (2nd occurrence) gets key "search__2" which COLLIDES
# with the explicit "search__2" entry.
# Current implementation will overwrite the first "search__2" value.
# This is a known edge case - documenting actual behavior.
assert "search__2" in result
assert "search" in result
def test_non_dict_items_in_array(self):
"""Array contains non-dict items (e.g., strings, ints).
Items without 'name' key fallback to tool list, and non-dict
items get empty dict as args.
"""
thought = _make_thought(
tool="search;calc;weather",
tool_input=json.dumps([
"just a string",
42,
None,
]),
)
result = thought.tool_inputs_dict
# Non-dict items: name from tools list, args = {} (since not isinstance dict)
assert result["search"] == {}
assert result["calc"] == {}
assert result["weather"] == {}
def test_mixed_named_and_unnamed_items(self):
"""Array with some items having 'name' key and others not."""
thought = _make_thought(
tool="search;calculator;weather",
tool_input=json.dumps([
{"name": "search", "arguments": {"q": "test"}},
{"expr": "1+1"}, # no 'name' key
{"name": "weather", "arguments": {"city": "NYC"}},
]),
)
result = thought.tool_inputs_dict
assert result["search"] == {"q": "test"}
# Second item has no 'name', falls back to tools[1] = "calculator"
# But args = item since it's a dict but no "arguments" key
assert result["calculator"] == {"expr": "1+1"}
assert result["weather"] == {"city": "NYC"}
def test_empty_array(self):
"""Empty JSON array returns empty dict."""
thought = _make_thought(
tool="search",
tool_input=json.dumps([]),
)
result = thought.tool_inputs_dict
assert result == {}
def test_tool_none(self):
"""tool field is None (not just empty string)."""
thought = _make_thought(
tool=None,
tool_input=json.dumps({"q": "test"}),
)
result = thought.tool_inputs_dict
# tools property returns [] when tool is None
# data is a dict, tools is empty, so loop doesn't add anything
assert result == {}
def test_tool_input_is_none(self):
"""tool_input is None returns empty dict per tool."""
thought = _make_thought(
tool="search;calculator",
tool_input=None,
)
result = thought.tool_inputs_dict
assert result == {"search": {}, "calculator": {}}
def test_json_primitive_string(self):
"""tool_input is a JSON string primitive (not array or dict).
Single tool should wrap it.
"""
thought = _make_thought(
tool="search",
tool_input=json.dumps("hello world"),
)
result = thought.tool_inputs_dict
# data is a string, not list/dict. len(tools)==1, so {tools[0]: data}
assert result == {"search": "hello world"}
def test_json_primitive_number(self):
"""tool_input is a JSON number."""
thought = _make_thought(
tool="calc",
tool_input=json.dumps(42),
)
result = thought.tool_inputs_dict
assert result == {"calc": 42}
def test_json_primitive_null(self):
"""tool_input is JSON null."""
thought = _make_thought(
tool="search",
tool_input=json.dumps(None),
)
result = thought.tool_inputs_dict
# None is not list, not dict. Single tool, so {tools[0]: None}
assert result == {"search": None}
def test_json_primitive_with_multiple_tools(self):
"""JSON primitive with multiple tools returns empty dict."""
thought = _make_thought(
tool="search;calc",
tool_input=json.dumps("hello"),
)
result = thought.tool_inputs_dict
# data is string, not list/dict, len(tools) != 1 => return {}
assert result == {}
def test_unicode_tool_names(self):
"""Unicode tool names in both tool field and array items."""
thought = _make_thought(
tool="búsqueda;búsqueda",
tool_input=json.dumps([
{"name": "búsqueda", "arguments": {"q": "primero"}},
{"name": "búsqueda", "arguments": {"q": "segundo"}},
]),
)
result = thought.tool_inputs_dict
assert result["búsqueda"] == {"q": "primero"}
assert result["búsqueda__2"] == {"q": "segundo"}
def test_tool_with_semicolons_in_name(self):
"""Tools are separated by semicolons. A tool name containing ';'
would be split incorrectly. This tests the actual behavior.
"""
# If someone stored "my;tool" as a tool name, split(";") breaks it.
thought = _make_thought(
tool="my;tool", # This becomes ["my", "tool"] not ["my;tool"]
tool_input=json.dumps({"my": {"a": 1}, "tool": {"b": 2}}),
)
result = thought.tool_inputs_dict
assert result == {"my": {"a": 1}, "tool": {"b": 2}}
def test_whitespace_in_tool_names(self):
"""Tool names with leading/trailing whitespace."""
thought = _make_thought(
tool=" search ; calc ",
tool_input=json.dumps({" search ": {"q": "test"}, " calc ": {"expr": "1+1"}}),
)
result = thought.tool_inputs_dict
# split(";") preserves whitespace
assert result[" search "] == {"q": "test"}
assert result[" calc "] == {"expr": "1+1"}
def test_old_format_single_tool_nested_dict(self):
"""Old format: single tool, input is a nested dict that looks like
it could be a multi-tool dict (has a key matching the tool name).
"""
thought = _make_thought(
tool="search",
tool_input=json.dumps({"search": "my query", "limit": 10}),
)
result = thought.tool_inputs_dict
# Single tool, "search" is in data => result["search"] = data["search"]
# Wait - code checks `if tool in data` for each tool.
# For single tool "search", "search" is in data, so result["search"] = data["search"] = "my query"
# This means the "limit" key is LOST
assert result["search"] == "my query"
def test_old_format_single_tool_no_key_match(self):
"""Old format: single tool where the dict keys don't match the tool name.
Should treat the entire dict as the tool's input.
"""
thought = _make_thought(
tool="search",
tool_input=json.dumps({"query": "test", "limit": 5}),
)
result = thought.tool_inputs_dict
# "search" not in data, len(tools)==1, so result["search"] = data
assert result["search"] == {"query": "test", "limit": 5}
def test_empty_name_in_array_items(self):
"""Array items with empty string as 'name'."""
thought = _make_thought(
tool="search;calc",
tool_input=json.dumps([
{"name": "", "arguments": {"q": "test"}},
{"name": "", "arguments": {"expr": "1+1"}},
]),
)
result = thought.tool_inputs_dict
# Both have name="" so dedup produces "" and "__2"
assert result[""] == {"q": "test"}
assert result["__2"] == {"expr": "1+1"}
def test_array_item_name_mismatch_with_tools(self):
"""Array item names don't match the tool list at all."""
thought = _make_thought(
tool="search;calculator",
tool_input=json.dumps([
{"name": "foo", "arguments": {"a": 1}},
{"name": "bar", "arguments": {"b": 2}},
]),
)
result = thought.tool_inputs_dict
# Array format uses item names, not tool list
assert result == {"foo": {"a": 1}, "bar": {"b": 2}}
def test_arguments_is_none(self):
"""Array item where 'arguments' is explicitly None."""
thought = _make_thought(
tool="search",
tool_input=json.dumps([
{"name": "search", "arguments": None},
]),
)
result = thought.tool_inputs_dict
# item.get("arguments", {}) returns None since key exists
assert result["search"] is None
def test_arguments_is_empty_dict(self):
"""Array item where 'arguments' is an empty dict."""
thought = _make_thought(
tool="search",
tool_input=json.dumps([
{"name": "search", "arguments": {}},
]),
)
result = thought.tool_inputs_dict
assert result["search"] == {}
def test_arguments_key_missing(self):
"""Array item with 'name' but no 'arguments' key at all."""
thought = _make_thought(
tool="search",
tool_input=json.dumps([
{"name": "search"},
]),
)
result = thought.tool_inputs_dict
assert result["search"] == {}
def test_deeply_nested_arguments(self):
"""Arguments contain deeply nested structures."""
deep_args = {"level1": {"level2": {"level3": [1, 2, {"level4": True}]}}}
thought = _make_thought(
tool="search",
tool_input=json.dumps([
{"name": "search", "arguments": deep_args},
]),
)
result = thought.tool_inputs_dict
assert result["search"] == deep_args
# ===================================================================
# tool_outputs_dict edge cases
# ===================================================================
class TestToolOutputsDictEdgeCases:
"""Edge cases for tool_outputs_dict property."""
def test_array_longer_than_tools_list(self):
"""Array with more items than tools list."""
thought = _make_thought(
tool="search",
observation=json.dumps([
{"name": "search", "output": "result1"},
{"name": "extra", "output": "result2"},
]),
)
result = thought.tool_outputs_dict
assert result["search"] == "result1"
assert result["extra"] == "result2"
def test_interleaved_duplicates(self):
"""Interleaved duplicate tool names in outputs."""
thought = _make_thought(
tool="search;calc;search;calc",
observation=json.dumps([
{"name": "search", "output": "s1"},
{"name": "calc", "output": "c1"},
{"name": "search", "output": "s2"},
{"name": "calc", "output": "c2"},
]),
)
result = thought.tool_outputs_dict
assert result["search"] == "s1"
assert result["search__2"] == "s2"
assert result["calc"] == "c1"
assert result["calc__2"] == "c2"
def test_non_string_observation_fallback(self):
"""observation that is not valid JSON returns raw string for each tool."""
thought = _make_thought(
tool="search;calc",
observation="raw error text from tool",
)
result = thought.tool_outputs_dict
# JSON decode fails, except block: dict.fromkeys(tools, self.observation)
assert result == {"search": "raw error text from tool", "calc": "raw error text from tool"}
def test_observation_is_none(self):
"""observation is None returns empty dicts for each tool."""
thought = _make_thought(
tool="search;calc",
observation=None,
)
result = thought.tool_outputs_dict
assert result == {"search": {}, "calc": {}}
def test_output_key_missing(self):
"""Array item with 'name' but no 'output' key."""
thought = _make_thought(
tool="search",
observation=json.dumps([
{"name": "search"},
]),
)
result = thought.tool_outputs_dict
# item.get("output", "") returns ""
assert result["search"] == ""
def test_output_is_complex_object(self):
"""Output contains a complex nested object."""
complex_output = {"data": [1, 2, 3], "metadata": {"count": 3}}
thought = _make_thought(
tool="api_call",
observation=json.dumps([
{"name": "api_call", "output": complex_output},
]),
)
result = thought.tool_outputs_dict
assert result["api_call"] == complex_output
def test_empty_array_observation(self):
"""Empty JSON array observation."""
thought = _make_thought(
tool="search",
observation=json.dumps([]),
)
result = thought.tool_outputs_dict
assert result == {}
def test_json_primitive_string_observation_single_tool(self):
"""JSON string observation with single tool wraps it."""
thought = _make_thought(
tool="search",
observation=json.dumps("found 5 results"),
)
result = thought.tool_outputs_dict
assert result == {"search": "found 5 results"}
def test_json_primitive_string_observation_multi_tool(self):
"""JSON string observation with multiple tools returns empty dict."""
thought = _make_thought(
tool="search;calc",
observation=json.dumps("some result"),
)
result = thought.tool_outputs_dict
assert result == {}
def test_many_duplicates_in_outputs(self):
"""Many duplicate tool names in outputs."""
items = [{"name": "api", "output": f"response_{i}"} for i in range(5)]
thought = _make_thought(
tool=";".join(["api"] * 5),
observation=json.dumps(items),
)
result = thought.tool_outputs_dict
assert result["api"] == "response_0"
for i in range(1, 5):
assert result[f"api__{i + 1}"] == f"response_{i}"
def test_old_format_single_tool_observation_key_match(self):
"""Old format: single tool, observation dict has key matching tool name."""
thought = _make_thought(
tool="search",
observation=json.dumps({"search": "the result", "extra": "ignored"}),
)
result = thought.tool_outputs_dict
# "search" is in data => result["search"] = data["search"]
assert result["search"] == "the result"
# ===================================================================
# tool_meta edge cases
# ===================================================================
class TestToolMetaEdgeCases:
"""Edge cases for tool_meta property."""
def test_array_longer_than_tools_list(self):
"""Array with more items than tools list - extra items use fallback name."""
thought = _make_thought(
tool="search",
tool_meta_str=json.dumps([
{"name": "search", "meta": {"time": 1.0}},
{"name": "extra", "meta": {"time": 2.0}},
]),
)
result = thought.tool_meta
assert result["search"] == {"time": 1.0}
assert result["extra"] == {"time": 2.0}
def test_interleaved_duplicates(self):
"""Interleaved duplicates in meta."""
thought = _make_thought(
tool="search;calc;search;calc",
tool_meta_str=json.dumps([
{"name": "search", "meta": {"cost": 0.1}},
{"name": "calc", "meta": {"cost": 0.2}},
{"name": "search", "meta": {"cost": 0.3}},
{"name": "calc", "meta": {"cost": 0.4}},
]),
)
result = thought.tool_meta
assert result["search"] == {"cost": 0.1}
assert result["search__2"] == {"cost": 0.3}
assert result["calc"] == {"cost": 0.2}
assert result["calc__2"] == {"cost": 0.4}
def test_non_dict_meta_items(self):
"""Array items that are not dicts (e.g., None, string, int)."""
thought = _make_thought(
tool="search;calc;weather",
tool_meta_str=json.dumps([
"not a dict",
42,
None,
]),
)
result = thought.tool_meta
# Non-dict items: name from tools[i], meta = {} (not isinstance dict)
assert result["search"] == {}
assert result["calc"] == {}
assert result["weather"] == {}
def test_meta_key_missing_in_named_item(self):
"""Array item with 'name' but no 'meta' key."""
thought = _make_thought(
tool="search",
tool_meta_str=json.dumps([
{"name": "search"},
]),
)
result = thought.tool_meta
assert result["search"] == {}
def test_meta_is_none_in_named_item(self):
"""Array item where 'meta' value is None."""
thought = _make_thought(
tool="search",
tool_meta_str=json.dumps([
{"name": "search", "meta": None},
]),
)
result = thought.tool_meta
assert result["search"] is None
def test_json_null_meta_str(self):
"""tool_meta_str is the string 'null'."""
thought = _make_thought(
tool="search",
tool_meta_str="null",
)
result = thought.tool_meta
# json.loads("null") = None, not list, not dict => return {}
assert result == {}
def test_json_array_of_arrays(self):
"""Nested array format - array of arrays (not expected format)."""
thought = _make_thought(
tool="search;calc",
tool_meta_str=json.dumps([[1, 2], [3, 4]]),
)
result = thought.tool_meta
# Items are lists, not dicts with "name"
# Falls to else: name = tools[i], meta = {} (not isinstance dict)
assert result["search"] == {}
assert result["calc"] == {}
def test_tool_meta_str_is_empty_string(self):
"""Explicitly empty string for tool_meta_str."""
thought = _make_thought(
tool="search",
tool_meta_str="",
)
result = thought.tool_meta
assert result == {}
def test_fallback_name_tool_index_out_of_range(self):
"""Array items without 'name' when index exceeds tools list length."""
thought = _make_thought(
tool="search",
tool_meta_str=json.dumps([
{"time": 1.0},
{"time": 2.0},
{"time": 3.0},
]),
)
result = thought.tool_meta
# Item 0: tools[0] = "search", item is dict => meta = item
# Item 1: i=1 >= len(tools)=1 => name = "tool_1"
# Item 2: i=2 >= len(tools)=1 => name = "tool_2"
assert result["search"] == {"time": 1.0}
assert result["tool_1"] == {"time": 2.0}
assert result["tool_2"] == {"time": 3.0}
# ===================================================================
# tools property edge cases
# ===================================================================
class TestToolsPropertyEdgeCases:
"""Edge cases for the tools property."""
def test_tool_with_trailing_semicolon(self):
"""Tool string ending with semicolon creates empty last element."""
thought = _make_thought(
tool="search;",
tool_input=json.dumps({"search": {"q": "test"}}),
)
# "search;".split(";") = ["search", ""]
assert thought.tools == ["search", ""]
def test_tool_with_leading_semicolon(self):
"""Tool string starting with semicolon creates empty first element."""
thought = _make_thought(
tool=";search",
)
assert thought.tools == ["", "search"]
def test_tool_with_multiple_semicolons(self):
"""Consecutive semicolons create empty elements."""
thought = _make_thought(
tool="search;;calc",
)
assert thought.tools == ["search", "", "calc"]
def test_tool_single_semicolon(self):
"""Single semicolon creates two empty strings."""
thought = _make_thought(
tool=";",
)
assert thought.tools == ["", ""]
# ===================================================================
# Cross-property consistency edge cases
# ===================================================================
class TestCrossPropertyConsistency:
"""Tests that tool_inputs_dict and tool_outputs_dict produce
consistent key sets for the same tool configuration.
"""
def test_same_keys_for_matching_inputs_and_outputs(self):
"""Inputs and outputs should produce the same key set when
they have the same tool names in the same order.
"""
tools = "search;search;calc"
thought = _make_thought(
tool=tools,
tool_input=json.dumps([
{"name": "search", "arguments": {"q": "a"}},
{"name": "search", "arguments": {"q": "b"}},
{"name": "calc", "arguments": {"expr": "1+1"}},
]),
observation=json.dumps([
{"name": "search", "output": "r1"},
{"name": "search", "output": "r2"},
{"name": "calc", "output": "2"},
]),
tool_meta_str=json.dumps([
{"name": "search", "meta": {"t": 1}},
{"name": "search", "meta": {"t": 2}},
{"name": "calc", "meta": {"t": 3}},
]),
)
input_keys = set(thought.tool_inputs_dict.keys())
output_keys = set(thought.tool_outputs_dict.keys())
meta_keys = set(thought.tool_meta.keys())
assert input_keys == output_keys == meta_keys
assert input_keys == {"search", "search__2", "calc"}
def test_mixed_old_and_new_formats_across_properties(self):
"""One property uses new array format, another uses old dict format.
This is a realistic scenario during migration.
"""
thought = _make_thought(
tool="search;calc",
tool_input=json.dumps([
{"name": "search", "arguments": {"q": "test"}},
{"name": "calc", "arguments": {"expr": "1+1"}},
]),
# Old dict format for observation
observation=json.dumps({"search": "result", "calc": "2"}),
# Old dict format for meta
tool_meta_str=json.dumps({"search": {"t": 1}, "calc": {"t": 2}}),
)
assert thought.tool_inputs_dict == {"search": {"q": "test"}, "calc": {"expr": "1+1"}}
assert thought.tool_outputs_dict == {"search": "result", "calc": "2"}
assert thought.tool_meta == {"search": {"t": 1}, "calc": {"t": 2}}

View File

@ -0,0 +1,242 @@
"""Tests for duplicate tool name fix - array format support in MessageAgentThought."""
import json
import pytest
from models.model import MessageAgentThought
def _make_thought(**kwargs) -> MessageAgentThought:
"""Create a MessageAgentThought with required defaults."""
defaults = {
"message_id": "msg-1",
"position": 1,
"created_by_role": "account",
"created_by": "user-1",
}
defaults.update(kwargs)
return MessageAgentThought(**defaults)
class TestToolInputsDict:
"""Test tool_inputs_dict property with old and new formats."""
def test_new_array_format_no_duplicates(self):
"""Array format with unique tool names returns simple dict."""
thought = _make_thought(
tool="search;calculator",
tool_input=json.dumps([
{"name": "search", "arguments": {"q": "test"}},
{"name": "calculator", "arguments": {"expr": "1+1"}},
]),
)
result = thought.tool_inputs_dict
assert result == {"search": {"q": "test"}, "calculator": {"expr": "1+1"}}
def test_new_array_format_with_duplicates(self):
"""Array format with duplicate tool names uses ordinal keys."""
thought = _make_thought(
tool="search;search;calculator",
tool_input=json.dumps([
{"name": "search", "arguments": {"q": "python"}},
{"name": "search", "arguments": {"q": "javascript"}},
{"name": "calculator", "arguments": {"expr": "2+2"}},
]),
)
result = thought.tool_inputs_dict
assert result["search"] == {"q": "python"}
assert result["search__2"] == {"q": "javascript"}
assert result["calculator"] == {"expr": "2+2"}
def test_new_array_format_triple_duplicates(self):
"""Array format with 3 identical tool names."""
thought = _make_thought(
tool="search;search;search",
tool_input=json.dumps([
{"name": "search", "arguments": {"q": "a"}},
{"name": "search", "arguments": {"q": "b"}},
{"name": "search", "arguments": {"q": "c"}},
]),
)
result = thought.tool_inputs_dict
assert result["search"] == {"q": "a"}
assert result["search__2"] == {"q": "b"}
assert result["search__3"] == {"q": "c"}
def test_old_dict_format_backward_compat(self):
"""Old dict format with multiple tools still works."""
thought = _make_thought(
tool="search;calculator",
tool_input=json.dumps({"search": {"q": "test"}, "calculator": {"expr": "1+1"}}),
)
result = thought.tool_inputs_dict
assert result == {"search": {"q": "test"}, "calculator": {"expr": "1+1"}}
def test_old_dict_format_single_tool(self):
"""Old format with single tool where input is the arguments dict directly."""
thought = _make_thought(
tool="search",
tool_input=json.dumps({"q": "test"}),
)
result = thought.tool_inputs_dict
assert result == {"search": {"q": "test"}}
def test_empty_tool_input(self):
"""Empty tool_input returns empty dict for each tool."""
thought = _make_thought(tool="search;calculator", tool_input="")
result = thought.tool_inputs_dict
assert result == {"search": {}, "calculator": {}}
def test_malformed_json(self):
"""Malformed JSON returns empty dict."""
thought = _make_thought(tool="search", tool_input="not json")
result = thought.tool_inputs_dict
assert result == {}
def test_no_tool(self):
"""No tool field returns empty dict."""
thought = _make_thought(tool="", tool_input="")
result = thought.tool_inputs_dict
assert result == {}
def test_array_without_name_field_uses_tools_list(self):
"""Array items without 'name' key fall back to tools list for names."""
thought = _make_thought(
tool="search;calculator",
tool_input=json.dumps([
{"q": "test"},
{"expr": "1+1"},
]),
)
result = thought.tool_inputs_dict
assert result == {"search": {"q": "test"}, "calculator": {"expr": "1+1"}}
class TestToolOutputsDict:
"""Test tool_outputs_dict property."""
def test_new_array_format_with_duplicates(self):
"""Array format observations with duplicate names use ordinal keys."""
thought = _make_thought(
tool="search;search;calculator",
observation=json.dumps([
{"name": "search", "output": "result1"},
{"name": "search", "output": "result2"},
{"name": "calculator", "output": "4"},
]),
)
result = thought.tool_outputs_dict
assert result["search"] == "result1"
assert result["search__2"] == "result2"
assert result["calculator"] == "4"
def test_new_array_format_no_duplicates(self):
"""Array format with unique names."""
thought = _make_thought(
tool="search;calculator",
observation=json.dumps([
{"name": "search", "output": "found it"},
{"name": "calculator", "output": "42"},
]),
)
result = thought.tool_outputs_dict
assert result == {"search": "found it", "calculator": "42"}
def test_old_dict_format(self):
"""Old dict format backward compat."""
thought = _make_thought(
tool="search;calculator",
observation=json.dumps({"search": "found", "calculator": "42"}),
)
result = thought.tool_outputs_dict
assert result == {"search": "found", "calculator": "42"}
def test_empty_observation(self):
"""Empty observation returns empty dict per tool."""
thought = _make_thought(tool="search", observation="")
result = thought.tool_outputs_dict
assert result == {"search": {}}
def test_old_dict_single_tool(self):
"""Old format with single tool where observation is the full dict."""
thought = _make_thought(
tool="search",
observation=json.dumps({"results": ["a", "b"]}),
)
result = thought.tool_outputs_dict
assert result == {"search": {"results": ["a", "b"]}}
def test_array_without_name_field(self):
"""Array items without 'name' key fall back to tools list."""
thought = _make_thought(
tool="search;calculator",
observation=json.dumps([
"result from search",
"result from calc",
]),
)
result = thought.tool_outputs_dict
assert result == {"search": "result from search", "calculator": "result from calc"}
class TestToolMeta:
"""Test tool_meta property."""
def test_new_array_format_with_duplicates(self):
"""Array format meta with duplicate names uses ordinal keys."""
thought = _make_thought(
tool="search;search",
tool_meta_str=json.dumps([
{"name": "search", "meta": {"time_cost": 1.5}},
{"name": "search", "meta": {"time_cost": 2.0}},
]),
)
result = thought.tool_meta
assert result["search"] == {"time_cost": 1.5}
assert result["search__2"] == {"time_cost": 2.0}
def test_new_array_format_no_duplicates(self):
"""Array format with unique names."""
thought = _make_thought(
tool="search;calculator",
tool_meta_str=json.dumps([
{"name": "search", "meta": {"time_cost": 1.5}},
{"name": "calculator", "meta": {"time_cost": 0.3}},
]),
)
result = thought.tool_meta
assert result == {"search": {"time_cost": 1.5}, "calculator": {"time_cost": 0.3}}
def test_old_dict_format(self):
"""Old dict format backward compat."""
thought = _make_thought(
tool="search;calculator",
tool_meta_str=json.dumps({"search": {"time_cost": 1.5}, "calculator": {"time_cost": 2.0}}),
)
result = thought.tool_meta
assert result == {"search": {"time_cost": 1.5}, "calculator": {"time_cost": 2.0}}
def test_empty_meta(self):
"""Empty meta returns empty dict."""
thought = _make_thought(tool="search", tool_meta_str="")
result = thought.tool_meta
assert result == {}
def test_malformed_json(self):
"""Malformed JSON returns empty dict."""
thought = _make_thought(tool="search", tool_meta_str="not json")
result = thought.tool_meta
assert result == {}
def test_array_without_name_field(self):
"""Array items without 'name' key fall back to tools list."""
thought = _make_thought(
tool="search;calculator",
tool_meta_str=json.dumps([
{"time_cost": 1.5},
{"time_cost": 0.3},
]),
)
result = thought.tool_meta
assert result == {"search": {"time_cost": 1.5}, "calculator": {"time_cost": 0.3}}

View File

@ -0,0 +1,410 @@
"""
Security audit tests for the duplicate tool name fix and related changes.
This module tests for potential security vulnerabilities introduced or
addressed by the changes in the fix/duplicate-tool-name branch.
"""
import sys
from collections.abc import Mapping
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# SEC-01: _extract_usage_dict recursive depth bomb (Medium)
#
# Both MCPTool._extract_usage_dict and WorkflowTool._extract_usage_dict
# recurse into arbitrarily nested Mapping / list structures from external
# MCP server responses. A malicious MCP server can craft a deeply nested
# payload that causes a RecursionError (stack overflow), crashing the
# worker process.
# ---------------------------------------------------------------------------
class TestExtractUsageDictRecursionDepth:
"""Verify that deeply nested payloads do not crash the process."""
@staticmethod
def _build_deeply_nested_payload(depth: int) -> dict[str, Any]:
"""Build a payload nested to *depth* levels."""
inner: dict[str, Any] = {"usage": {"total_tokens": 42}}
for _ in range(depth):
inner = {"nested": inner}
return inner
def test_mcp_tool_deep_nesting_does_not_crash(self) -> None:
"""
SEC-01a: MCPTool._extract_usage_dict with depth approaching
Python recursion limit. Should either return a result or None,
but must NOT raise RecursionError.
"""
from core.tools.mcp_tool.tool import MCPTool
# Build a payload 500 levels deep - well within Python's default
# recursion limit (~1000) but enough to show unbounded recursion.
payload = self._build_deeply_nested_payload(500)
# This should NOT crash
result = MCPTool._extract_usage_dict(payload)
# It should eventually find the usage dict
assert result is not None
assert result.get("total_tokens") == 42
def test_mcp_tool_very_deep_nesting_hits_recursion_limit(self) -> None:
"""
SEC-01b: Demonstrate that a sufficiently deep payload WILL
trigger RecursionError because there is no depth limit.
This is the actual vulnerability: an MCP server returning a payload
nested deeper than sys.getrecursionlimit() will crash the worker.
"""
from core.tools.mcp_tool.tool import MCPTool
depth = sys.getrecursionlimit() + 100
payload = self._build_deeply_nested_payload(depth)
# This WILL raise RecursionError, demonstrating the vulnerability
with pytest.raises(RecursionError):
MCPTool._extract_usage_dict(payload)
def test_workflow_tool_deep_nesting_does_not_crash(self) -> None:
"""
SEC-01c: Same test for WorkflowTool._extract_usage_dict.
"""
from core.tools.workflow_as_tool.tool import WorkflowTool
payload = self._build_deeply_nested_payload(500)
result = WorkflowTool._extract_usage_dict(payload)
assert result is not None
assert result.get("total_tokens") == 42
def test_workflow_tool_very_deep_nesting_hits_recursion_limit(self) -> None:
"""
SEC-01d: Demonstrate the same vulnerability in WorkflowTool.
"""
from core.tools.workflow_as_tool.tool import WorkflowTool
depth = sys.getrecursionlimit() + 100
payload = self._build_deeply_nested_payload(depth)
with pytest.raises(RecursionError):
WorkflowTool._extract_usage_dict(payload)
# ---------------------------------------------------------------------------
# SEC-02: MCP usage extraction trusts external data without validation (Low)
#
# _extract_usage_dict blindly returns whatever Mapping it finds under a
# "usage" key. If the MCP server supplies non-numeric values for fields
# like total_tokens, the downstream LLMUsage.from_metadata may behave
# unexpectedly. This is a data-integrity / low-severity issue.
# ---------------------------------------------------------------------------
class TestMCPUsageDataIntegrity:
"""Verify that malformed usage data from MCP servers is handled."""
def test_extract_usage_dict_returns_arbitrary_keys(self) -> None:
"""
SEC-02a: _extract_usage_dict does not filter keys, so a malicious
MCP server can inject arbitrary keys into the usage dict.
"""
from core.tools.mcp_tool.tool import MCPTool
payload: dict[str, Any] = {
"usage": {
"total_tokens": 100,
"malicious_key": "evil_value",
"__class__": "should_not_be_here",
}
}
result = MCPTool._extract_usage_dict(payload)
assert result is not None
# The method returns the raw dict without filtering
assert "malicious_key" in result
assert "__class__" in result
def test_extract_usage_dict_non_numeric_token_values(self) -> None:
"""
SEC-02b: Non-numeric token values are passed through without
validation. Downstream consumers may fail or behave unexpectedly.
"""
from core.tools.mcp_tool.tool import MCPTool
payload: dict[str, Any] = {
"usage": {
"total_tokens": "not_a_number",
"prompt_tokens": {"nested": "object"},
}
}
result = MCPTool._extract_usage_dict(payload)
assert result is not None
assert result["total_tokens"] == "not_a_number"
# ---------------------------------------------------------------------------
# SEC-03: Human Input node bypass check (Info - Positive Security Test)
#
# Verify that ensure_no_human_input_nodes correctly rejects workflows
# containing human-input nodes, and cannot be bypassed with variations.
# ---------------------------------------------------------------------------
class TestHumanInputNodeBypass:
"""Test that human input node detection cannot be bypassed."""
def test_blocks_human_input_node(self) -> None:
"""SEC-03a: Standard human-input node is correctly blocked."""
from core.tools.errors import WorkflowToolHumanInputNotSupportedError
from core.tools.utils.workflow_configuration_sync import WorkflowToolConfigurationUtils
graph: dict[str, Any] = {
"nodes": [
{"data": {"type": "start"}},
{"data": {"type": "human-input"}},
{"data": {"type": "end"}},
]
}
with pytest.raises(WorkflowToolHumanInputNotSupportedError):
WorkflowToolConfigurationUtils.ensure_no_human_input_nodes(graph)
def test_allows_workflow_without_human_input(self) -> None:
"""SEC-03b: Clean workflow passes validation."""
from core.tools.utils.workflow_configuration_sync import WorkflowToolConfigurationUtils
graph: dict[str, Any] = {
"nodes": [
{"data": {"type": "start"}},
{"data": {"type": "llm"}},
{"data": {"type": "end"}},
]
}
# Should NOT raise
WorkflowToolConfigurationUtils.ensure_no_human_input_nodes(graph)
def test_case_sensitivity_not_bypassed(self) -> None:
"""
SEC-03c: Verify that 'Human-Input' or 'HUMAN-INPUT' do NOT
bypass the check (NodeType is a StrEnum with value 'human-input').
"""
from core.tools.utils.workflow_configuration_sync import WorkflowToolConfigurationUtils
# These should NOT raise since NodeType.HUMAN_INPUT == "human-input"
# and "Human-Input" != "human-input"
for variant in ["Human-Input", "HUMAN-INPUT", "human_input", "humaninput"]:
graph: dict[str, Any] = {"nodes": [{"data": {"type": variant}}]}
# None of these should raise because they don't match exactly
WorkflowToolConfigurationUtils.ensure_no_human_input_nodes(graph)
def test_empty_graph_is_safe(self) -> None:
"""SEC-03d: Empty graph or missing nodes key doesn't crash."""
from core.tools.utils.workflow_configuration_sync import WorkflowToolConfigurationUtils
WorkflowToolConfigurationUtils.ensure_no_human_input_nodes({})
WorkflowToolConfigurationUtils.ensure_no_human_input_nodes({"nodes": []})
def test_node_with_missing_data_key(self) -> None:
"""SEC-03e: Node without 'data' key doesn't crash."""
from core.tools.utils.workflow_configuration_sync import WorkflowToolConfigurationUtils
graph: dict[str, Any] = {"nodes": [{}]}
WorkflowToolConfigurationUtils.ensure_no_human_input_nodes(graph)
# ---------------------------------------------------------------------------
# SEC-04: ToolInvokeMessage field validator injection (Low)
#
# The decode_blob_message validator now coerces message dicts based on
# the type field. Verify that the coercion logic is safe.
# ---------------------------------------------------------------------------
class TestToolInvokeMessageCoercion:
"""Test message type coercion logic in ToolInvokeMessage."""
def test_json_type_wraps_dict_in_json_object(self) -> None:
"""
SEC-04a: When type=JSON and message is a raw dict without
'json_object', it should be wrapped.
"""
from core.tools.entities.tool_entities import ToolInvokeMessage
msg = ToolInvokeMessage(
type=ToolInvokeMessage.MessageType.JSON,
message={"key": "value"},
)
assert isinstance(msg.message, ToolInvokeMessage.JsonMessage)
assert msg.message.json_object == {"key": "value"}
def test_json_type_preserves_json_object_key(self) -> None:
"""
SEC-04b: When type=JSON and message already has 'json_object',
it should not double-wrap.
"""
from core.tools.entities.tool_entities import ToolInvokeMessage
msg = ToolInvokeMessage(
type=ToolInvokeMessage.MessageType.JSON,
message={"json_object": {"inner": "data"}},
)
assert isinstance(msg.message, ToolInvokeMessage.JsonMessage)
assert msg.message.json_object == {"inner": "data"}
def test_file_type_coercion_ignores_payload(self) -> None:
"""
SEC-04c: When type=FILE, the message dict is replaced with a
fixed file_marker regardless of what was sent. This prevents
any user-controlled data from being stored in the FileMessage.
"""
from core.tools.entities.tool_entities import ToolInvokeMessage
msg = ToolInvokeMessage(
type=ToolInvokeMessage.MessageType.FILE,
message={"arbitrary": "data", "exploit": True},
)
assert isinstance(msg.message, ToolInvokeMessage.FileMessage)
assert msg.message.file_marker == "file_marker"
def test_json_message_accepts_list_type(self) -> None:
"""
SEC-04d: JsonMessage.json_object now accepts list type.
Verify it works with a list.
"""
from core.tools.entities.tool_entities import ToolInvokeMessage
msg = ToolInvokeMessage(
type=ToolInvokeMessage.MessageType.JSON,
message={"json_object": [1, 2, 3]},
)
assert isinstance(msg.message, ToolInvokeMessage.JsonMessage)
assert msg.message.json_object == [1, 2, 3]
# ---------------------------------------------------------------------------
# SEC-05: SSRF fix in WordExtractor (Positive Security Test)
#
# Verify that the WordExtractor now uses ssrf_proxy instead of httpx.get
# directly. This is a SSRF fix (critical vulnerability was patched).
# ---------------------------------------------------------------------------
class TestSSRFFixWordExtractor:
"""Verify SSRF protection in WordExtractor."""
def test_word_extractor_uses_ssrf_proxy(self) -> None:
"""
SEC-05a: WordExtractor should use ssrf_proxy.get, not httpx.get.
Verify by checking the source code import.
"""
import inspect
from core.rag.extractor.word_extractor import WordExtractor
source = inspect.getsource(WordExtractor)
# Should NOT contain direct httpx.get call
assert "httpx.get(" not in source, (
"WordExtractor still uses httpx.get directly, which is vulnerable to SSRF"
)
# Should contain ssrf_proxy usage
assert "ssrf_proxy" in source, "WordExtractor should use ssrf_proxy for URL downloads"
# ---------------------------------------------------------------------------
# SEC-06: Removed _try_resolve_user_from_request (Positive Security Test)
#
# The WorkflowTool no longer tries to resolve user from Flask request
# context via LocalProxy, which could leak user context across async tasks.
# ---------------------------------------------------------------------------
class TestUserResolutionSecurity:
"""Verify that WorkflowTool resolves users only from database."""
def test_no_request_context_dependency(self) -> None:
"""
SEC-06a: WorkflowTool._resolve_user should not reference
Flask request context or current_user in executable code.
"""
import inspect
from core.tools.workflow_as_tool.tool import WorkflowTool
# Check the module-level source for the removed function
import core.tools.workflow_as_tool.tool as tool_module
module_source = inspect.getsource(tool_module)
assert "_try_resolve_user_from_request" not in module_source, (
"_try_resolve_user_from_request has been removed for security"
)
# Verify that libs.login / current_user is not imported
assert "from libs.login import current_user" not in module_source, (
"WorkflowTool should not import current_user to avoid "
"cross-context user leakage in async/Celery workers"
)
# ---------------------------------------------------------------------------
# SEC-07: Error message information disclosure in tool_engine.py (Info)
#
# Error messages from tool invocations are now logged with exc_info=True.
# Verify that error responses returned to users do not leak stack traces.
# ---------------------------------------------------------------------------
class TestToolEngineErrorDisclosure:
"""Verify that tool error responses don't leak sensitive information."""
def test_credential_error_hides_details(self) -> None:
"""
SEC-07a: ToolProviderCredentialValidationError should return
a generic message, not the actual exception details.
"""
from core.tools.tool_engine import ToolEngine
import inspect
source = inspect.getsource(ToolEngine.agent_invoke)
# The credential error should return a generic message
assert 'Please check your tool provider credentials' in source
# And should NOT include the exception message in the response
# (it goes to the log instead)
# ---------------------------------------------------------------------------
# SEC-08: SSRF proxy header validation (Positive Security Test)
#
# The ssrf_proxy now validates headers with Pydantic TypeAdapter.
# ---------------------------------------------------------------------------
class TestSSRFProxyHeaderValidation:
"""Verify that SSRF proxy validates headers."""
def test_rejects_non_string_header_values(self) -> None:
"""
SEC-08a: Headers with non-string values should be rejected.
"""
from core.helper.ssrf_proxy import _HEADERS_ADAPTER
from pydantic import ValidationError
# Valid headers
result = _HEADERS_ADAPTER.validate_python({"Content-Type": "application/json"})
assert result == {"Content-Type": "application/json"}
# Invalid: nested dict as header value
with pytest.raises(ValidationError):
_HEADERS_ADAPTER.validate_python({"X-Evil": {"nested": "object"}})
# Invalid: list as header value
with pytest.raises(ValidationError):
_HEADERS_ADAPTER.validate_python({"X-Evil": [1, 2, 3]})
# ---------------------------------------------------------------------------
# SEC-09: WorkflowTool pause_state_config set to None (Positive Test)
#
# WorkflowTool explicitly sets pause_state_config=None to prevent
# human-input pausing within tool execution context.
# ---------------------------------------------------------------------------
class TestWorkflowToolPauseDisabled:
"""Verify that WorkflowTool disables pause mechanisms."""
def test_pause_state_config_is_none_in_source(self) -> None:
"""
SEC-09a: WorkflowTool._invoke should set pause_state_config=None
when calling WorkflowAppGenerator.generate.
"""
import inspect
from core.tools.workflow_as_tool.tool import WorkflowTool
source = inspect.getsource(WorkflowTool._invoke)
assert "pause_state_config=None" in source, (
"WorkflowTool must disable pause_state_config to prevent "
"human-input nodes from pausing tool execution indefinitely"
)