mirror of
https://github.com/mistralai/mistral-vibe
synced 2026-04-25 17:14:55 +02:00
Co-authored-by: Quentin Torroba <quentin.torroba@mistral.ai> Co-authored-by: Clément Drouin <clement.drouin@mistral.ai> Co-authored-by: Clément Sirieix <clement.sirieix@mistral.ai> Co-authored-by: Mistral Vibe <vibe@mistral.ai>
671 lines
22 KiB
Python
671 lines
22 KiB
Python
from __future__ import annotations
|
|
|
|
from collections import Counter
|
|
from collections.abc import Callable
|
|
from http import HTTPStatus
|
|
from typing import cast
|
|
from unittest.mock import AsyncMock, Mock
|
|
|
|
import httpx
|
|
from pydantic import BaseModel
|
|
import pytest
|
|
|
|
from tests.conftest import build_test_agent_loop, build_test_vibe_config
|
|
from tests.mock.utils import mock_llm_chunk
|
|
from tests.stubs.fake_backend import FakeBackend
|
|
from vibe.core.agents.models import BuiltinAgentName
|
|
from vibe.core.config import VibeConfig
|
|
from vibe.core.llm.exceptions import BackendErrorBuilder
|
|
from vibe.core.middleware import (
|
|
ConversationContext,
|
|
MiddlewareAction,
|
|
MiddlewareResult,
|
|
ResetReason,
|
|
)
|
|
from vibe.core.tools.builtins.todo import TodoArgs
|
|
from vibe.core.types import (
|
|
ApprovalResponse,
|
|
AssistantEvent,
|
|
FunctionCall,
|
|
LLMMessage,
|
|
RateLimitError,
|
|
ReasoningEvent,
|
|
Role,
|
|
ToolCall,
|
|
ToolCallEvent,
|
|
ToolResultEvent,
|
|
UserMessageEvent,
|
|
)
|
|
from vibe.core.utils import CancellationReason, get_user_cancellation_message
|
|
|
|
|
|
class InjectBeforeMiddleware:
|
|
injected_message = "<injected>"
|
|
|
|
async def before_turn(self, context: ConversationContext) -> MiddlewareResult:
|
|
"Inject a message just before the current step executes."
|
|
return MiddlewareResult(
|
|
action=MiddlewareAction.INJECT_MESSAGE, message=self.injected_message
|
|
)
|
|
|
|
def reset(self, reset_reason: ResetReason = ResetReason.STOP) -> None:
|
|
return None
|
|
|
|
|
|
def make_config(
|
|
*, enabled_tools: list[str] | None = None, tools: dict[str, dict] | None = None
|
|
) -> VibeConfig:
|
|
return build_test_vibe_config(
|
|
system_prompt_id="tests",
|
|
include_project_context=False,
|
|
include_prompt_detail=False,
|
|
include_model_info=False,
|
|
include_commit_signature=False,
|
|
enabled_tools=enabled_tools or [],
|
|
tools=tools or {},
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def observer_capture() -> tuple[
|
|
list[tuple[Role, str | None]], Callable[[LLMMessage], None]
|
|
]:
|
|
observed: list[tuple[Role, str | None]] = []
|
|
|
|
def observer(msg: LLMMessage) -> None:
|
|
observed.append((msg.role, msg.content))
|
|
|
|
return observed, observer
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_act_flushes_batched_messages_with_injection_middleware(
|
|
observer_capture,
|
|
) -> None:
|
|
observed, observer = observer_capture
|
|
|
|
backend = FakeBackend([mock_llm_chunk(content="I can write very efficient code.")])
|
|
agent = build_test_agent_loop(
|
|
config=make_config(), message_observer=observer, backend=backend
|
|
)
|
|
agent.middleware_pipeline.add(InjectBeforeMiddleware())
|
|
|
|
async for _ in agent.act("How can you help?"):
|
|
pass
|
|
|
|
assert len(observed) == 4
|
|
assert [r for r, _ in observed] == [
|
|
Role.system,
|
|
Role.user,
|
|
Role.user,
|
|
Role.assistant,
|
|
]
|
|
assert observed[0][1] == "You are Vibe, a super useful programming assistant."
|
|
assert observed[1][1] == "How can you help?"
|
|
assert observed[2][1] == InjectBeforeMiddleware.injected_message
|
|
assert observed[3][1] == "I can write very efficient code."
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stop_action_flushes_user_msg_before_returning(observer_capture) -> None:
|
|
observed, observer = observer_capture
|
|
|
|
# max_turns=0 forces an immediate STOP on the first before_turn
|
|
backend = FakeBackend([
|
|
mock_llm_chunk(content="My response will never reach you...")
|
|
])
|
|
agent = build_test_agent_loop(
|
|
config=make_config(), message_observer=observer, max_turns=0, backend=backend
|
|
)
|
|
|
|
async for _ in agent.act("Greet."):
|
|
pass
|
|
|
|
assert len(observed) == 2
|
|
# user's message should have been flushed before returning
|
|
assert [r for r, _ in observed] == [Role.system, Role.user]
|
|
assert observed[0][1] == "You are Vibe, a super useful programming assistant."
|
|
assert observed[1][1] == "Greet."
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_act_emits_user_and_assistant_msgs(observer_capture) -> None:
|
|
observed, observer = observer_capture
|
|
|
|
backend = FakeBackend([mock_llm_chunk(content="Pong!")])
|
|
agent = build_test_agent_loop(
|
|
config=make_config(), message_observer=observer, backend=backend
|
|
)
|
|
|
|
async for _ in agent.act("Ping?"):
|
|
pass
|
|
|
|
assert len(observed) == 3
|
|
assert [r for r, _ in observed] == [Role.system, Role.user, Role.assistant]
|
|
assert observed[1][1] == "Ping?"
|
|
assert observed[2][1] == "Pong!"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_act_streams_chunks_in_order() -> None:
|
|
backend = FakeBackend([
|
|
mock_llm_chunk(content="Hello"),
|
|
mock_llm_chunk(content=" from"),
|
|
mock_llm_chunk(content=" Vibe"),
|
|
mock_llm_chunk(content="! "),
|
|
mock_llm_chunk(content="More"),
|
|
mock_llm_chunk(content=" and"),
|
|
mock_llm_chunk(content=" end"),
|
|
])
|
|
agent = build_test_agent_loop(
|
|
config=make_config(), backend=backend, enable_streaming=True
|
|
)
|
|
|
|
events = [event async for event in agent.act("Stream, please.")]
|
|
|
|
assistant_events = [e for e in events if isinstance(e, AssistantEvent)]
|
|
assert len(assistant_events) == 7
|
|
assert [event.content for event in assistant_events] == [
|
|
"Hello",
|
|
" from",
|
|
" Vibe",
|
|
"! ",
|
|
"More",
|
|
" and",
|
|
" end",
|
|
]
|
|
assert agent.messages[-1].role == Role.assistant
|
|
assert agent.messages[-1].content == "Hello from Vibe! More and end"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_act_streaming_does_not_cleanup_tmp_files_directly() -> None:
|
|
backend = FakeBackend([
|
|
mock_llm_chunk(content="Hello"),
|
|
mock_llm_chunk(content=" from"),
|
|
mock_llm_chunk(content=" Vibe"),
|
|
])
|
|
agent = build_test_agent_loop(
|
|
config=make_config(), backend=backend, enable_streaming=True
|
|
)
|
|
agent.session_logger.save_interaction = AsyncMock(return_value=None)
|
|
cleanup_spy = Mock()
|
|
agent.session_logger.maybe_cleanup_tmp_files = cleanup_spy
|
|
|
|
events = [event async for event in agent.act("Stream, please.")]
|
|
|
|
assistant_events = [event for event in events if isinstance(event, AssistantEvent)]
|
|
assert len(assistant_events) == 3
|
|
assert cleanup_spy.call_count == 0
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_act_handles_streaming_with_tool_call_events_in_sequence() -> None:
|
|
todo_tool_call = ToolCall(
|
|
id="tc_stream",
|
|
index=0,
|
|
function=FunctionCall(name="todo", arguments='{"action": "read"}'),
|
|
)
|
|
backend = FakeBackend([
|
|
[
|
|
mock_llm_chunk(content="Checking your todos."),
|
|
mock_llm_chunk(content="", tool_calls=[todo_tool_call]),
|
|
],
|
|
[mock_llm_chunk(content="Done reviewing todos.")],
|
|
])
|
|
agent = build_test_agent_loop(
|
|
config=make_config(
|
|
enabled_tools=["todo"], tools={"todo": {"permission": "always"}}
|
|
),
|
|
backend=backend,
|
|
agent_name=BuiltinAgentName.AUTO_APPROVE,
|
|
enable_streaming=True,
|
|
)
|
|
|
|
events = [event async for event in agent.act("What about my todos?")]
|
|
|
|
assert [type(event) for event in events] == [
|
|
UserMessageEvent,
|
|
AssistantEvent,
|
|
ToolCallEvent,
|
|
ToolCallEvent,
|
|
ToolResultEvent,
|
|
AssistantEvent,
|
|
]
|
|
assert isinstance(events[0], UserMessageEvent)
|
|
assert isinstance(events[1], AssistantEvent)
|
|
assert events[1].content == "Checking your todos."
|
|
assert isinstance(events[2], ToolCallEvent)
|
|
assert events[2].args is None # streaming event
|
|
assert events[2].tool_call_id == "tc_stream"
|
|
assert events[2].tool_name == "todo"
|
|
assert isinstance(events[3], ToolCallEvent)
|
|
assert events[3].args is not None
|
|
assert events[3].tool_name == "todo"
|
|
assert isinstance(events[4], ToolResultEvent)
|
|
assert events[4].error is None
|
|
assert events[4].skipped is False
|
|
assert isinstance(events[5], AssistantEvent)
|
|
assert events[5].content == "Done reviewing todos."
|
|
assert agent.messages[-1].content == "Done reviewing todos."
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_act_handles_tool_call_chunk_with_content() -> None:
|
|
todo_tool_call = ToolCall(
|
|
id="tc_content",
|
|
index=0,
|
|
function=FunctionCall(name="todo", arguments='{"action": "read"}'),
|
|
)
|
|
backend = FakeBackend([
|
|
mock_llm_chunk(content="Preparing "),
|
|
mock_llm_chunk(content="todo request", tool_calls=[todo_tool_call]),
|
|
mock_llm_chunk(content=" complete"),
|
|
])
|
|
agent = build_test_agent_loop(
|
|
config=make_config(
|
|
enabled_tools=["todo"], tools={"todo": {"permission": "always"}}
|
|
),
|
|
backend=backend,
|
|
agent_name=BuiltinAgentName.AUTO_APPROVE,
|
|
enable_streaming=True,
|
|
)
|
|
|
|
events = [event async for event in agent.act("Check todos with content.")]
|
|
|
|
event_types = [type(e) for e in events]
|
|
assert Counter(event_types) == Counter({
|
|
UserMessageEvent: 1,
|
|
AssistantEvent: 3,
|
|
ToolCallEvent: 2,
|
|
ToolResultEvent: 1,
|
|
})
|
|
|
|
tool_call_events = [e for e in events if isinstance(e, ToolCallEvent)]
|
|
assert len(tool_call_events) == 2
|
|
assert any(
|
|
tc.args is None and tc.tool_call_id == "tc_content" for tc in tool_call_events
|
|
)
|
|
assert any(tc.args is not None for tc in tool_call_events)
|
|
|
|
assistant_events = [e for e in events if isinstance(e, AssistantEvent)]
|
|
assistant_contents = {e.content for e in assistant_events}
|
|
assert "Preparing " in assistant_contents
|
|
assert "todo request" in assistant_contents
|
|
assert " complete" in assistant_contents
|
|
|
|
assert any(
|
|
m.role == Role.assistant and m.content == "Preparing todo request complete"
|
|
for m in agent.messages
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_act_merges_streamed_tool_call_arguments() -> None:
|
|
tool_call_part_one = ToolCall(
|
|
id="tc_merge",
|
|
index=0,
|
|
function=FunctionCall(
|
|
name="todo", arguments='{"action": "read", "note": "First '
|
|
),
|
|
)
|
|
tool_call_part_two = ToolCall(
|
|
id="tc_merge", index=0, function=FunctionCall(name="todo", arguments='part"}')
|
|
)
|
|
backend = FakeBackend([
|
|
mock_llm_chunk(content="Planning: "),
|
|
mock_llm_chunk(content="", tool_calls=[tool_call_part_one]),
|
|
mock_llm_chunk(content="", tool_calls=[tool_call_part_two]),
|
|
])
|
|
agent = build_test_agent_loop(
|
|
config=make_config(
|
|
enabled_tools=["todo"], tools={"todo": {"permission": "always"}}
|
|
),
|
|
backend=backend,
|
|
agent_name=BuiltinAgentName.AUTO_APPROVE,
|
|
enable_streaming=True,
|
|
)
|
|
|
|
events = [event async for event in agent.act("Merge streamed tool call args.")]
|
|
|
|
assert [type(event) for event in events] == [
|
|
UserMessageEvent,
|
|
AssistantEvent,
|
|
ToolCallEvent,
|
|
ToolCallEvent,
|
|
ToolResultEvent,
|
|
]
|
|
assert isinstance(events[0], UserMessageEvent)
|
|
assert isinstance(events[2], ToolCallEvent)
|
|
assert events[2].args is None # streaming event
|
|
assert events[2].tool_call_id == "tc_merge"
|
|
call_event = events[3]
|
|
assert isinstance(call_event, ToolCallEvent)
|
|
assert call_event.tool_call_id == "tc_merge"
|
|
call_args = cast(TodoArgs, call_event.args)
|
|
assert call_args.action == "read"
|
|
assert isinstance(events[4], ToolResultEvent)
|
|
assert events[4].error is None
|
|
assert events[4].skipped is False
|
|
assistant_with_calls = next(
|
|
m for m in agent.messages if m.role == Role.assistant and m.tool_calls
|
|
)
|
|
reconstructed_calls = assistant_with_calls.tool_calls or []
|
|
assert len(reconstructed_calls) == 1
|
|
assert reconstructed_calls[0].function.arguments == (
|
|
'{"action": "read", "note": "First part"}'
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_act_handles_user_cancellation_during_streaming() -> None:
|
|
class CountingMiddleware:
|
|
def __init__(self) -> None:
|
|
self.before_calls = 0
|
|
|
|
async def before_turn(self, context: ConversationContext) -> MiddlewareResult:
|
|
self.before_calls += 1
|
|
return MiddlewareResult()
|
|
|
|
def reset(self, reset_reason: ResetReason = ResetReason.STOP) -> None:
|
|
return None
|
|
|
|
todo_tool_call = ToolCall(
|
|
id="tc_cancel",
|
|
index=0,
|
|
function=FunctionCall(name="todo", arguments='{"action": "read"}'),
|
|
)
|
|
backend = FakeBackend([
|
|
mock_llm_chunk(content="Preparing "),
|
|
mock_llm_chunk(content="todo request", tool_calls=[todo_tool_call]),
|
|
])
|
|
agent = build_test_agent_loop(
|
|
config=make_config(
|
|
enabled_tools=["todo"], tools={"todo": {"permission": "ask"}}
|
|
),
|
|
backend=backend,
|
|
agent_name=BuiltinAgentName.DEFAULT,
|
|
enable_streaming=True,
|
|
)
|
|
middleware = CountingMiddleware()
|
|
agent.middleware_pipeline.add(middleware)
|
|
|
|
async def _reject_callback(
|
|
_name: str, _args: BaseModel, _id: str, _rp: list | None = None
|
|
) -> tuple[ApprovalResponse, str | None]:
|
|
return (
|
|
ApprovalResponse.NO,
|
|
str(get_user_cancellation_message(CancellationReason.OPERATION_CANCELLED)),
|
|
)
|
|
|
|
agent.set_approval_callback(_reject_callback)
|
|
agent.session_logger.save_interaction = AsyncMock(return_value=None)
|
|
|
|
events = [event async for event in agent.act("Cancel mid stream?")]
|
|
|
|
assert [type(event) for event in events] == [
|
|
UserMessageEvent,
|
|
AssistantEvent,
|
|
ToolCallEvent,
|
|
AssistantEvent,
|
|
ToolCallEvent,
|
|
ToolResultEvent,
|
|
]
|
|
assert middleware.before_calls == 1
|
|
assert isinstance(events[-1], ToolResultEvent)
|
|
assert events[-1].skipped is True
|
|
assert events[-1].skip_reason is not None
|
|
assert "<user_cancellation>" in events[-1].skip_reason
|
|
assert events[-1].cancelled is True
|
|
assert agent.session_logger.save_interaction.await_count >= 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_act_flushes_and_logs_when_streaming_errors(observer_capture) -> None:
|
|
observed, observer = observer_capture
|
|
backend = FakeBackend(exception_to_raise=RuntimeError("boom in streaming"))
|
|
agent = build_test_agent_loop(
|
|
config=make_config(),
|
|
backend=backend,
|
|
message_observer=observer,
|
|
enable_streaming=True,
|
|
)
|
|
agent.session_logger.save_interaction = AsyncMock(return_value=None)
|
|
|
|
with pytest.raises(RuntimeError, match="boom in streaming"):
|
|
[_ async for _ in agent.act("Trigger stream failure")]
|
|
|
|
assert [role for role, _ in observed] == [Role.system, Role.user]
|
|
assert agent.session_logger.save_interaction.await_count == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rate_limit(observer_capture) -> None:
|
|
observed, observer = observer_capture
|
|
response = httpx.Response(
|
|
HTTPStatus.TOO_MANY_REQUESTS, request=httpx.Request("POST", "http://test")
|
|
)
|
|
error = httpx.HTTPStatusError(
|
|
"rate limited", request=response.request, response=response
|
|
)
|
|
backend_error = BackendErrorBuilder.build_http_error(
|
|
provider="mistral",
|
|
endpoint="test",
|
|
error=error,
|
|
model="test-model",
|
|
messages=[],
|
|
temperature=0.0,
|
|
has_tools=False,
|
|
tool_choice=None,
|
|
)
|
|
backend = FakeBackend(exception_to_raise=backend_error)
|
|
agent = build_test_agent_loop(
|
|
config=make_config(),
|
|
backend=backend,
|
|
message_observer=observer,
|
|
enable_streaming=True,
|
|
)
|
|
agent.session_logger.save_interaction = AsyncMock(return_value=None)
|
|
|
|
with pytest.raises(RateLimitError):
|
|
[_ async for _ in agent.act("Trigger rate limit failure while streaming")]
|
|
|
|
assert [role for role, _ in observed] == [Role.system, Role.user]
|
|
assert agent.session_logger.save_interaction.await_count == 1
|
|
|
|
|
|
def _snapshot_events(events: list) -> list[tuple[str, str]]:
|
|
return [
|
|
(type(e).__name__, e.content)
|
|
for e in events
|
|
if isinstance(e, (AssistantEvent, ReasoningEvent))
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reasoning_yields_before_content_on_transition() -> None:
|
|
backend = FakeBackend([
|
|
mock_llm_chunk(content="", reasoning_content="Let me think"),
|
|
mock_llm_chunk(content="", reasoning_content=" about this"),
|
|
mock_llm_chunk(content="", reasoning_content=" problem..."),
|
|
mock_llm_chunk(content="The answer is 42."),
|
|
])
|
|
agent = build_test_agent_loop(
|
|
config=make_config(), backend=backend, enable_streaming=True
|
|
)
|
|
|
|
events = [event async for event in agent.act("What's the answer?")]
|
|
|
|
assert _snapshot_events(events) == [
|
|
("ReasoningEvent", "Let me think"),
|
|
("ReasoningEvent", " about this"),
|
|
("ReasoningEvent", " problem..."),
|
|
("AssistantEvent", "The answer is 42."),
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reasoning_yields_per_chunk() -> None:
|
|
backend = FakeBackend([
|
|
mock_llm_chunk(content="", reasoning_content="Step 1"),
|
|
mock_llm_chunk(content="", reasoning_content=", Step 2"),
|
|
mock_llm_chunk(content="", reasoning_content=", Step 3"),
|
|
mock_llm_chunk(content="", reasoning_content=", Step 4"),
|
|
mock_llm_chunk(content="", reasoning_content=", Step 5"),
|
|
mock_llm_chunk(content="", reasoning_content=", Step 6"),
|
|
mock_llm_chunk(content="", reasoning_content=", Final"),
|
|
mock_llm_chunk(content="Done thinking!"),
|
|
])
|
|
agent = build_test_agent_loop(
|
|
config=make_config(), backend=backend, enable_streaming=True
|
|
)
|
|
|
|
events = [event async for event in agent.act("Think step by step")]
|
|
|
|
assert _snapshot_events(events) == [
|
|
("ReasoningEvent", "Step 1"),
|
|
("ReasoningEvent", ", Step 2"),
|
|
("ReasoningEvent", ", Step 3"),
|
|
("ReasoningEvent", ", Step 4"),
|
|
("ReasoningEvent", ", Step 5"),
|
|
("ReasoningEvent", ", Step 6"),
|
|
("ReasoningEvent", ", Final"),
|
|
("AssistantEvent", "Done thinking!"),
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_content_yields_before_reasoning_on_transition() -> None:
|
|
"""When content chunks arrive and reasoning arrives, content yields first."""
|
|
backend = FakeBackend([
|
|
mock_llm_chunk(content="Starting the response"),
|
|
mock_llm_chunk(content=" here..."),
|
|
mock_llm_chunk(content="", reasoning_content="Wait, let me reconsider"),
|
|
mock_llm_chunk(content="", reasoning_content=" this approach..."),
|
|
mock_llm_chunk(content="Actually, the final answer."),
|
|
])
|
|
agent = build_test_agent_loop(
|
|
config=make_config(), backend=backend, enable_streaming=True
|
|
)
|
|
|
|
events = [event async for event in agent.act("Give me an answer")]
|
|
|
|
assert _snapshot_events(events) == [
|
|
("AssistantEvent", "Starting the response"),
|
|
("AssistantEvent", " here..."),
|
|
("ReasoningEvent", "Wait, let me reconsider"),
|
|
("ReasoningEvent", " this approach..."),
|
|
("AssistantEvent", "Actually, the final answer."),
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_interleaved_reasoning_content_preserves_order() -> None:
|
|
backend = FakeBackend([
|
|
mock_llm_chunk(content="", reasoning_content="Think 1"),
|
|
mock_llm_chunk(content="Answer 1 "),
|
|
mock_llm_chunk(content="", reasoning_content="Think 2"),
|
|
mock_llm_chunk(content="Answer 2 "),
|
|
mock_llm_chunk(content="", reasoning_content="Think 3"),
|
|
mock_llm_chunk(content="Answer 3"),
|
|
])
|
|
agent = build_test_agent_loop(
|
|
config=make_config(), backend=backend, enable_streaming=True
|
|
)
|
|
|
|
events = [event async for event in agent.act("Interleaved test")]
|
|
|
|
assert _snapshot_events(events) == [
|
|
("ReasoningEvent", "Think 1"),
|
|
("AssistantEvent", "Answer 1 "),
|
|
("ReasoningEvent", "Think 2"),
|
|
("AssistantEvent", "Answer 2 "),
|
|
("ReasoningEvent", "Think 3"),
|
|
("AssistantEvent", "Answer 3"),
|
|
]
|
|
|
|
assistant_msg = next(m for m in agent.messages if m.role == Role.assistant)
|
|
assert assistant_msg.reasoning_content == "Think 1Think 2Think 3"
|
|
assert assistant_msg.content == "Answer 1 Answer 2 Answer 3"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_only_reasoning_chunks_yields_reasoning_event() -> None:
|
|
backend = FakeBackend([
|
|
mock_llm_chunk(content="", reasoning_content="Just thinking..."),
|
|
mock_llm_chunk(content="", reasoning_content=" nothing to say yet."),
|
|
])
|
|
agent = build_test_agent_loop(
|
|
config=make_config(), backend=backend, enable_streaming=True
|
|
)
|
|
|
|
events = [event async for event in agent.act("Silent thinking")]
|
|
|
|
assert _snapshot_events(events) == [
|
|
("ReasoningEvent", "Just thinking..."),
|
|
("ReasoningEvent", " nothing to say yet."),
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_final_buffers_flush_in_correct_order() -> None:
|
|
backend = FakeBackend([
|
|
mock_llm_chunk(content="", reasoning_content="Final thought"),
|
|
mock_llm_chunk(content="Final words"),
|
|
])
|
|
agent = build_test_agent_loop(
|
|
config=make_config(), backend=backend, enable_streaming=True
|
|
)
|
|
|
|
events = [event async for event in agent.act("End buffers test")]
|
|
|
|
assert _snapshot_events(events) == [
|
|
("ReasoningEvent", "Final thought"),
|
|
("AssistantEvent", "Final words"),
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_empty_content_chunks_do_not_trigger_false_yields() -> None:
|
|
backend = FakeBackend([
|
|
mock_llm_chunk(content="", reasoning_content="Reasoning here"),
|
|
mock_llm_chunk(content=""), # Empty content shouldn't yield
|
|
mock_llm_chunk(content="", reasoning_content=" more reasoning"),
|
|
mock_llm_chunk(content="Actual content"),
|
|
])
|
|
agent = build_test_agent_loop(
|
|
config=make_config(), backend=backend, enable_streaming=True
|
|
)
|
|
|
|
events = [event async for event in agent.act("Empty content test")]
|
|
|
|
assert _snapshot_events(events) == [
|
|
("ReasoningEvent", "Reasoning here"),
|
|
("ReasoningEvent", " more reasoning"),
|
|
("AssistantEvent", "Actual content"),
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_streaming_assistant_event_message_id_matches_stored_message() -> None:
|
|
backend = FakeBackend([
|
|
mock_llm_chunk(content="Hello"),
|
|
mock_llm_chunk(content=" world"),
|
|
])
|
|
agent = build_test_agent_loop(
|
|
config=make_config(), backend=backend, enable_streaming=True
|
|
)
|
|
|
|
events = [event async for event in agent.act("Test")]
|
|
|
|
assistant_events = [e for e in events if isinstance(e, AssistantEvent)]
|
|
assert len(assistant_events) == 2
|
|
|
|
# All chunks of the same assistant turn share one message_id
|
|
message_ids = {e.message_id for e in assistant_events}
|
|
assert len(message_ids) == 1
|
|
|
|
# The stored LLMMessage must carry that same message_id
|
|
stored_msg = next(m for m in agent.messages if m.role == Role.assistant)
|
|
assert stored_msg.message_id == assistant_events[0].message_id
|