diff --git a/.github/workflows/build-and-upload.yml b/.github/workflows/build-and-upload.yml
index 7c63ad5..559dfe9 100644
--- a/.github/workflows/build-and-upload.yml
+++ b/.github/workflows/build-and-upload.yml
@@ -55,10 +55,10 @@ jobs:
run: echo github.repository=${{ github.repository }}
- name: Checkout repository
- uses: actions/checkout@08eba0b27e820071cde6df949e0beb9ba4906955 # v4
+ uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5
- name: Install uv with caching
- uses: astral-sh/setup-uv@d4b2f3b6ecc6e67c4457f6d3e41ec42d3d0fcb86 # v5
+ uses: astral-sh/setup-uv@1e862dfacbd1d6d858c55d9b792c756523627244 # v7
with:
version: "latest"
enable-cache: true
@@ -140,7 +140,7 @@ jobs:
steps:
- name: Checkout repository
if: matrix.os != 'windows'
- uses: actions/checkout@08eba0b27e820071cde6df949e0beb9ba4906955 # v4
+ uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5
- name: Install Nix
if: matrix.os != 'windows'
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 89d78f6..cd25e6c 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -18,7 +18,7 @@ jobs:
steps:
- name: Checkout repository
- uses: actions/checkout@08eba0b27e820071cde6df949e0beb9ba4906955 # v4
+ uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5
- name: Install uv with caching
uses: astral-sh/setup-uv@1e862dfacbd1d6d858c55d9b792c756523627244 # v7
@@ -58,7 +58,7 @@ jobs:
steps:
- name: Checkout repository
- uses: actions/checkout@08eba0b27e820071cde6df949e0beb9ba4906955 # v4
+ uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5
- name: Install uv with caching
uses: astral-sh/setup-uv@1e862dfacbd1d6d858c55d9b792c756523627244 # v7
@@ -94,7 +94,7 @@ jobs:
steps:
- name: Checkout repository
- uses: actions/checkout@08eba0b27e820071cde6df949e0beb9ba4906955 # v4
+ uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5
- name: Install uv with caching
uses: astral-sh/setup-uv@1e862dfacbd1d6d858c55d9b792c756523627244 # v7
diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml
index 44a5bce..c011522 100644
--- a/.github/workflows/release.yml
+++ b/.github/workflows/release.yml
@@ -26,7 +26,7 @@ jobs:
python-version: "3.12"
- name: Install uv
- uses: astral-sh/setup-uv@d4b2f3b6ecc6e67c4457f6d3e41ec42d3d0fcb86 # v5
+ uses: astral-sh/setup-uv@1e862dfacbd1d6d858c55d9b792c756523627244 # v7
- name: Install dependencies
run: uv sync --locked --dev
diff --git a/.vscode/launch.json b/.vscode/launch.json
index a7a74a7..25b5555 100644
--- a/.vscode/launch.json
+++ b/.vscode/launch.json
@@ -1,5 +1,5 @@
{
- "version": "2.6.2",
+ "version": "2.7.0",
"configurations": [
{
"name": "ACP Server",
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e83ddf5..a3078c5 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -5,6 +5,18 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
+## [2.7.0] - 2026-03-24
+
+### Added
+
+- Rewind mode to navigate and fork conversation history
+
+### Fixed
+
+- Preserve message_id when aggregating streaming LLM chunks
+- Improved error handling for SDK response errors
+
+
## [2.6.2] - 2026-03-23
### Changed
diff --git a/action.yml b/action.yml
index 33f0d55..711693a 100644
--- a/action.yml
+++ b/action.yml
@@ -38,7 +38,7 @@ runs:
python-version: ${{ inputs.python_version }}
- name: Install uv
- uses: astral-sh/setup-uv@d4b2f3b6ecc6e67c4457f6d3e41ec42d3d0fcb86 # v5
+ uses: astral-sh/setup-uv@1e862dfacbd1d6d858c55d9b792c756523627244 # v7
- name: Install Mistral Vibe
shell: bash
diff --git a/distribution/zed/extension.toml b/distribution/zed/extension.toml
index bf7df08..38ce340 100644
--- a/distribution/zed/extension.toml
+++ b/distribution/zed/extension.toml
@@ -1,7 +1,7 @@
id = "mistral-vibe"
name = "Mistral Vibe"
description = "Mistral's open-source coding assistant"
-version = "2.6.2"
+version = "2.7.0"
schema_version = 1
authors = ["Mistral AI"]
repository = "https://github.com/mistralai/mistral-vibe"
@@ -11,25 +11,25 @@ name = "Mistral Vibe"
icon = "./icons/mistral_vibe.svg"
[agent_servers.mistral-vibe.targets.darwin-aarch64]
-archive = "https://github.com/mistralai/mistral-vibe/releases/download/v2.6.2/vibe-acp-darwin-aarch64-2.6.2.zip"
+archive = "https://github.com/mistralai/mistral-vibe/releases/download/v2.7.0/vibe-acp-darwin-aarch64-2.7.0.zip"
cmd = "./vibe-acp"
[agent_servers.mistral-vibe.targets.darwin-x86_64]
-archive = "https://github.com/mistralai/mistral-vibe/releases/download/v2.6.2/vibe-acp-darwin-x86_64-2.6.2.zip"
+archive = "https://github.com/mistralai/mistral-vibe/releases/download/v2.7.0/vibe-acp-darwin-x86_64-2.7.0.zip"
cmd = "./vibe-acp"
[agent_servers.mistral-vibe.targets.linux-aarch64]
-archive = "https://github.com/mistralai/mistral-vibe/releases/download/v2.6.2/vibe-acp-linux-aarch64-2.6.2.zip"
+archive = "https://github.com/mistralai/mistral-vibe/releases/download/v2.7.0/vibe-acp-linux-aarch64-2.7.0.zip"
cmd = "./vibe-acp"
[agent_servers.mistral-vibe.targets.linux-x86_64]
-archive = "https://github.com/mistralai/mistral-vibe/releases/download/v2.6.2/vibe-acp-linux-x86_64-2.6.2.zip"
+archive = "https://github.com/mistralai/mistral-vibe/releases/download/v2.7.0/vibe-acp-linux-x86_64-2.7.0.zip"
cmd = "./vibe-acp"
[agent_servers.mistral-vibe.targets.windows-aarch64]
-archive = "https://github.com/mistralai/mistral-vibe/releases/download/v2.6.2/vibe-acp-windows-aarch64-2.6.2.zip"
+archive = "https://github.com/mistralai/mistral-vibe/releases/download/v2.7.0/vibe-acp-windows-aarch64-2.7.0.zip"
cmd = "./vibe-acp.exe"
[agent_servers.mistral-vibe.targets.windows-x86_64]
-archive = "https://github.com/mistralai/mistral-vibe/releases/download/v2.6.2/vibe-acp-windows-x86_64-2.6.2.zip"
+archive = "https://github.com/mistralai/mistral-vibe/releases/download/v2.7.0/vibe-acp-windows-x86_64-2.7.0.zip"
cmd = "./vibe-acp.exe"
diff --git a/pyproject.toml b/pyproject.toml
index 767bbf7..c991486 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,6 +1,6 @@
[project]
name = "mistral-vibe"
-version = "2.6.2"
+version = "2.7.0"
description = "Minimal CLI coding agent by Mistral"
readme = "README.md"
requires-python = ">=3.12"
@@ -38,7 +38,7 @@ dependencies = [
"keyring>=25.6.0",
"markdownify>=1.2.2",
"mcp>=1.14.0",
- "mistralai==2.0.0",
+ "mistralai==2.1.3",
"opentelemetry-api>=1.39.1",
"opentelemetry-exporter-otlp-proto-http>=1.39.1",
"opentelemetry-sdk>=1.39.1",
diff --git a/tests/acp/test_initialize.py b/tests/acp/test_initialize.py
index 6f94a98..d5f09d9 100644
--- a/tests/acp/test_initialize.py
+++ b/tests/acp/test_initialize.py
@@ -28,7 +28,7 @@ class TestACPInitialize:
session_capabilities=SessionCapabilities(list=SessionListCapabilities()),
)
assert response.agent_info == Implementation(
- name="@mistralai/mistral-vibe", title="Mistral Vibe", version="2.6.2"
+ name="@mistralai/mistral-vibe", title="Mistral Vibe", version="2.7.0"
)
assert response.auth_methods == []
@@ -52,7 +52,7 @@ class TestACPInitialize:
session_capabilities=SessionCapabilities(list=SessionListCapabilities()),
)
assert response.agent_info == Implementation(
- name="@mistralai/mistral-vibe", title="Mistral Vibe", version="2.6.2"
+ name="@mistralai/mistral-vibe", title="Mistral Vibe", version="2.7.0"
)
assert response.auth_methods is not None
diff --git a/tests/backend/test_backend.py b/tests/backend/test_backend.py
index dd2c023..f68f120 100644
--- a/tests/backend/test_backend.py
+++ b/tests/backend/test_backend.py
@@ -13,9 +13,11 @@ the tests will be. Always prefer real API data over manually constructed example
from __future__ import annotations
import json
-from unittest.mock import MagicMock, patch
+from typing import ClassVar, Literal
+from unittest.mock import AsyncMock, MagicMock, patch
import httpx
+from mistralai.client.errors import SDKError
from mistralai.client.models import AssistantMessage
from mistralai.client.utils.retries import BackoffStrategy, RetryConfig
import pytest
@@ -38,7 +40,7 @@ from vibe.core.config import ModelConfig, ProviderConfig
from vibe.core.llm.backend.factory import BACKEND_FACTORY
from vibe.core.llm.backend.generic import GenericBackend
from vibe.core.llm.backend.mistral import MistralBackend, MistralMapper
-from vibe.core.llm.exceptions import BackendError
+from vibe.core.llm.exceptions import BackendError, BackendErrorBuilder
from vibe.core.llm.types import BackendLike
from vibe.core.types import Backend, FunctionCall, LLMChunk, LLMMessage, Role, ToolCall
from vibe.core.utils import get_user_agent
@@ -526,3 +528,237 @@ class TestMistralMapperPrepareMessage:
msg = LLMMessage(role=Role.assistant, content="Hello!")
result = mapper.prepare_message(msg)
assert result.content == "Hello!"
+
+ def test_strip_reasoning_removes_reasoning_from_assistant(
+ self, mapper: MistralMapper
+ ) -> None:
+ msg = LLMMessage(
+ role=Role.assistant,
+ content="Answer",
+ reasoning_content="thinking...",
+ reasoning_signature="sig",
+ )
+ stripped = mapper.strip_reasoning(msg)
+ assert stripped.content == "Answer"
+ assert stripped.reasoning_content is None
+ assert stripped.reasoning_signature is None
+
+ def test_strip_reasoning_leaves_non_assistant_unchanged(
+ self, mapper: MistralMapper
+ ) -> None:
+ msg = LLMMessage(role=Role.user, content="hello")
+ assert mapper.strip_reasoning(msg) is msg
+
+
+class TestMistralBackendReasoningEffort:
+ """Tests that MistralBackend correctly passes reasoning_effort to the SDK."""
+
+ @pytest.fixture
+ def backend(self) -> MistralBackend:
+ provider = ProviderConfig(
+ name="mistral",
+ api_base="https://api.mistral.ai/v1",
+ api_key_env_var="API_KEY",
+ )
+ return MistralBackend(provider=provider)
+
+ @pytest.mark.asyncio
+ @pytest.mark.parametrize(
+ ("thinking", "expected_effort", "expected_temperature"),
+ [
+ ("off", None, 0.2),
+ ("low", "none", 1.0),
+ ("medium", "high", 1.0),
+ ("high", "high", 1.0),
+ ],
+ )
+ async def test_complete_passes_reasoning_effort(
+ self,
+ backend: MistralBackend,
+ thinking: Literal["off", "low", "medium", "high"],
+ expected_effort: str | None,
+ expected_temperature: float,
+ ) -> None:
+ model = ModelConfig(
+ name="mistral-small-latest",
+ provider="mistral",
+ alias="mistral-small",
+ thinking=thinking,
+ )
+ messages = [LLMMessage(role=Role.user, content="hi")]
+
+ with patch.object(backend, "_get_client") as mock_get_client:
+ mock_client = MagicMock()
+ mock_response = MagicMock()
+ mock_response.choices = [MagicMock()]
+ mock_response.choices[0].message.content = "hello"
+ mock_response.choices[0].message.tool_calls = None
+ mock_response.usage.prompt_tokens = 10
+ mock_response.usage.completion_tokens = 5
+ mock_client.chat.complete_async = AsyncMock(return_value=mock_response)
+ mock_get_client.return_value = mock_client
+
+ await backend.complete(
+ model=model,
+ messages=messages,
+ temperature=0.2,
+ tools=None,
+ max_tokens=None,
+ tool_choice=None,
+ extra_headers=None,
+ )
+
+ call_kwargs = mock_client.chat.complete_async.call_args.kwargs
+ assert call_kwargs["reasoning_effort"] == expected_effort
+ assert call_kwargs["temperature"] == expected_temperature
+
+ @pytest.mark.asyncio
+ async def test_complete_strips_reasoning_when_thinking_off(
+ self, backend: MistralBackend
+ ) -> None:
+ model = ModelConfig(
+ name="mistral-small-latest",
+ provider="mistral",
+ alias="mistral-small",
+ thinking="off",
+ )
+ messages = [
+ LLMMessage(role=Role.user, content="hi"),
+ LLMMessage(
+ role=Role.assistant, content="answer", reasoning_content="thinking..."
+ ),
+ LLMMessage(role=Role.user, content="follow up"),
+ ]
+
+ with patch.object(backend, "_get_client") as mock_get_client:
+ mock_client = MagicMock()
+ mock_response = MagicMock()
+ mock_response.choices = [MagicMock()]
+ mock_response.choices[0].message.content = "response"
+ mock_response.choices[0].message.tool_calls = None
+ mock_response.usage.prompt_tokens = 10
+ mock_response.usage.completion_tokens = 5
+ mock_client.chat.complete_async = AsyncMock(return_value=mock_response)
+ mock_get_client.return_value = mock_client
+
+ await backend.complete(
+ model=model,
+ messages=messages,
+ temperature=0.2,
+ tools=None,
+ max_tokens=None,
+ tool_choice=None,
+ extra_headers=None,
+ )
+
+ call_kwargs = mock_client.chat.complete_async.call_args.kwargs
+ assert call_kwargs["reasoning_effort"] is None
+ # The assistant message should have reasoning stripped
+ converted_msgs = call_kwargs["messages"]
+ assistant_msg = converted_msgs[1]
+ assert isinstance(assistant_msg, AssistantMessage)
+ assert assistant_msg.content == "answer"
+
+
+class TestBuildHttpErrorBodyReading:
+ _MESSAGES: ClassVar[list[LLMMessage]] = [LLMMessage(role=Role.user, content="hi")]
+ _COMMON_KWARGS: ClassVar[dict] = dict(
+ provider="test",
+ endpoint="https://api.test.com",
+ model="test-model",
+ messages=_MESSAGES,
+ temperature=0.2,
+ has_tools=False,
+ tool_choice=None,
+ )
+
+ def _make_sdk_error(self, response: httpx.Response) -> SDKError:
+ return SDKError("sdk error", response)
+
+ def _make_http_status_error(
+ self, response: httpx.Response
+ ) -> httpx.HTTPStatusError:
+ return httpx.HTTPStatusError(
+ "http error", request=response.request, response=response
+ )
+
+ def test_sdk_error_readable_body(self) -> None:
+ response = httpx.Response(
+ 400,
+ json={"message": "invalid temperature"},
+ request=httpx.Request("POST", "https://api.test.com"),
+ )
+ err = BackendErrorBuilder.build_http_error(
+ error=self._make_sdk_error(response), **self._COMMON_KWARGS
+ )
+ assert err.status == 400
+ assert err.parsed_error == "invalid temperature"
+ assert "invalid temperature" in err.body_text
+
+ def test_http_status_error_readable_body(self) -> None:
+ response = httpx.Response(
+ 400,
+ json={"message": "invalid temperature"},
+ request=httpx.Request("POST", "https://api.test.com"),
+ )
+ err = BackendErrorBuilder.build_http_error(
+ error=self._make_http_status_error(response), **self._COMMON_KWARGS
+ )
+ assert err.status == 400
+ assert err.parsed_error == "invalid temperature"
+ assert "invalid temperature" in err.body_text
+
+ def test_sdk_error_stream_response_falls_back_to_read(self) -> None:
+ response = httpx.Response(
+ 400,
+ stream=httpx.ByteStream(b'{"message": "context too long"}'),
+ request=httpx.Request("POST", "https://api.test.com"),
+ )
+ sdk_err = SDKError(
+ "sdk error", response, body='{"message": "context too long"}'
+ )
+ err = BackendErrorBuilder.build_http_error(error=sdk_err, **self._COMMON_KWARGS)
+ assert err.parsed_error == "context too long"
+ assert "context too long" in err.body_text
+
+ def test_http_status_error_stream_response_falls_back_to_read(self) -> None:
+ response = httpx.Response(
+ 400,
+ stream=httpx.ByteStream(b'{"message": "context too long"}'),
+ request=httpx.Request("POST", "https://api.test.com"),
+ )
+ err = BackendErrorBuilder.build_http_error(
+ error=self._make_http_status_error(response), **self._COMMON_KWARGS
+ )
+ assert err.parsed_error == "context too long"
+ assert "context too long" in err.body_text
+
+ def test_sdk_error_unreadable_response_falls_back_to_str(self) -> None:
+ response = MagicMock(spec=httpx.Response)
+ response.status_code = 400
+ response.reason_phrase = "Bad Request"
+ response.headers = {}
+ type(response).text = property(lambda self: (_ for _ in ()).throw(Exception))
+ response.read.side_effect = Exception("closed")
+
+ sdk_err = SDKError("sdk msg", response, body='{"message": "context too long"}')
+ err = BackendErrorBuilder.build_http_error(error=sdk_err, **self._COMMON_KWARGS)
+ assert err.body_text == '{"message": "context too long"}'
+ assert err.parsed_error == "context too long"
+
+ def test_http_status_error_unreadable_response_falls_back_to_str(self) -> None:
+ response = MagicMock(spec=httpx.Response)
+ response.status_code = 400
+ response.reason_phrase = "Bad Request"
+ response.headers = {}
+ type(response).text = property(lambda self: (_ for _ in ()).throw(Exception))
+ response.read.side_effect = Exception("closed")
+ response.request = httpx.Request("POST", "https://api.test.com")
+
+ http_err = httpx.HTTPStatusError(
+ "http error with details", request=response.request, response=response
+ )
+ err = BackendErrorBuilder.build_http_error(
+ error=http_err, **self._COMMON_KWARGS
+ )
+ assert "http error with details" in err.body_text
diff --git a/tests/cli/textual_ui/windowing/test_session_windowing.py b/tests/cli/textual_ui/windowing/test_session_windowing.py
new file mode 100644
index 0000000..28e1aa9
--- /dev/null
+++ b/tests/cli/textual_ui/windowing/test_session_windowing.py
@@ -0,0 +1,47 @@
+from __future__ import annotations
+
+from vibe.cli.textual_ui.windowing.state import LOAD_MORE_BATCH_SIZE, SessionWindowing
+from vibe.core.types import LLMMessage, Role
+
+
+def _msg(
+ role: Role = Role.user, *, content: str = "x", injected: bool = False
+) -> LLMMessage:
+ return LLMMessage(role=role, content=content, injected=injected)
+
+
+def test_recompute_backfill_keeps_cursor_when_oldest_widgets_skip_injected_prefix() -> (
+ None
+):
+ """min(visible_indices) can sit past injected-only tail slots; backfill must not overlap."""
+ w = SessionWindowing(LOAD_MORE_BATCH_SIZE)
+ w.set_backfill([_msg() for _ in range(80)])
+ assert w.remaining == 80
+
+ history = [
+ *[_msg() for _ in range(80)],
+ *[_msg(injected=True) for _ in range(5)],
+ _msg(content="visible"),
+ ]
+ visible_indices = [85]
+ has_backfill = w.recompute_backfill(
+ history, visible_indices=visible_indices, visible_history_widgets_count=1
+ )
+ assert has_backfill
+ assert w.remaining == 80
+
+
+def test_recompute_backfill_advances_cursor_when_prefix_was_pruned_not_injected() -> (
+ None
+):
+ """If DOM lost widgets for non-injected messages, align cursor with oldest remaining widget."""
+ history = [_msg() for _ in range(100)]
+ w = SessionWindowing(LOAD_MORE_BATCH_SIZE)
+ w.set_backfill(history[:70])
+ assert w.remaining == 70
+
+ has_backfill = w.recompute_backfill(
+ history, visible_indices=[80], visible_history_widgets_count=10
+ )
+ assert has_backfill
+ assert w.remaining == 80
diff --git a/tests/core/test_rewind_integration.py b/tests/core/test_rewind_integration.py
new file mode 100644
index 0000000..0b84adc
--- /dev/null
+++ b/tests/core/test_rewind_integration.py
@@ -0,0 +1,308 @@
+"""Integration tests for the rewind feature through agent_loop.act().
+
+These tests drive the full pipeline:
+ act() → create_checkpoint() → _process_one_tool_call() →
+ get_file_snapshot() → add_snapshot() → tool writes file →
+ rewind_to_message() → verify file restoration.
+"""
+
+from __future__ import annotations
+
+import json
+from pathlib import Path
+
+import pytest
+
+from tests.conftest import build_test_agent_loop, build_test_vibe_config
+from tests.mock.utils import mock_llm_chunk
+from tests.stubs.fake_backend import FakeBackend
+from vibe.core.agents.models import BuiltinAgentName
+from vibe.core.types import BaseEvent, FunctionCall, ToolCall
+
+
+async def _act_and_collect(agent_loop, prompt: str) -> list[BaseEvent]:
+ return [ev async for ev in agent_loop.act(prompt)]
+
+
+def _write_file_tool_call(
+ path: str, content: str, *, call_id: str = "call_1", overwrite: bool = False
+) -> ToolCall:
+ args = json.dumps({"path": path, "content": content, "overwrite": overwrite})
+ return ToolCall(
+ id=call_id, index=0, function=FunctionCall(name="write_file", arguments=args)
+ )
+
+
+def _search_replace_tool_call(
+ file_path: str, search: str, replace: str, *, call_id: str = "call_1"
+) -> ToolCall:
+ content = f"<<<<<<< SEARCH\n{search}\n=======\n{replace}\n>>>>>>> REPLACE"
+ args = json.dumps({"file_path": file_path, "content": content})
+ return ToolCall(
+ id=call_id,
+ index=0,
+ function=FunctionCall(name="search_replace", arguments=args),
+ )
+
+
+def _bash_tool_call(command: str, *, call_id: str = "call_1") -> ToolCall:
+ args = json.dumps({"command": command})
+ return ToolCall(
+ id=call_id, index=0, function=FunctionCall(name="bash", arguments=args)
+ )
+
+
+def _make_agent_loop(backend: FakeBackend):
+ config = build_test_vibe_config(
+ enabled_tools=["write_file", "search_replace", "bash"],
+ tools={
+ "write_file": {"permission": "always"},
+ "search_replace": {"permission": "always"},
+ "bash": {"permission": "always"},
+ },
+ system_prompt_id="tests",
+ include_project_context=False,
+ include_prompt_detail=False,
+ )
+ return build_test_agent_loop(
+ config=config, agent_name=BuiltinAgentName.AUTO_APPROVE, backend=backend
+ )
+
+
+@pytest.mark.asyncio
+class TestRewindIntegration:
+ async def test_write_file_rewind_restores_original(
+ self, tmp_working_directory: Path
+ ) -> None:
+ """Write a file in turn 1, rewind to turn 1 → file should not exist."""
+ target = tmp_working_directory / "hello.txt"
+
+ backend = FakeBackend([
+ [
+ mock_llm_chunk(
+ content="Creating file.",
+ tool_calls=[_write_file_tool_call(str(target), "hello world")],
+ )
+ ],
+ [mock_llm_chunk(content="Done.")],
+ ])
+ agent_loop = _make_agent_loop(backend)
+
+ await _act_and_collect(agent_loop, "create hello.txt")
+ assert target.read_text() == "hello world"
+
+ rm = agent_loop.rewind_manager
+ rewindable = rm.get_rewindable_messages()
+ assert len(rewindable) == 1
+ await rm.rewind_to_message(rewindable[0][0], restore_files=True)
+
+ assert not target.exists()
+
+ async def test_search_replace_rewind_restores_previous_version(
+ self, tmp_working_directory: Path
+ ) -> None:
+ """Edit a pre-existing file with search_replace, rewind restores original."""
+ target = tmp_working_directory / "config.yaml"
+ target.write_text("key: original\n", encoding="utf-8")
+
+ backend = FakeBackend([
+ [
+ mock_llm_chunk(
+ content="Updating config.",
+ tool_calls=[
+ _search_replace_tool_call(
+ str(target), "key: original", "key: modified"
+ )
+ ],
+ )
+ ],
+ [mock_llm_chunk(content="Updated.")],
+ ])
+ agent_loop = _make_agent_loop(backend)
+
+ await _act_and_collect(agent_loop, "update config")
+ assert target.read_text() == "key: modified\n"
+
+ rm = agent_loop.rewind_manager
+ rewindable = rm.get_rewindable_messages()
+ await rm.rewind_to_message(rewindable[0][0], restore_files=True)
+
+ assert target.read_text() == "key: original\n"
+
+ async def test_write_then_search_replace_rewind_to_middle(
+ self, tmp_working_directory: Path
+ ) -> None:
+ """Turn 1 creates a file with write_file, turn 2 patches it with
+ search_replace. Rewind to turn 2 restores the turn 1 version.
+ """
+ target = tmp_working_directory / "app.py"
+
+ backend = FakeBackend([
+ # Turn 1: create the file
+ [
+ mock_llm_chunk(
+ content="Creating.",
+ tool_calls=[
+ _write_file_tool_call(str(target), "def main():\n pass\n")
+ ],
+ )
+ ],
+ [mock_llm_chunk(content="Created.")],
+ # Turn 2: patch with search_replace
+ [
+ mock_llm_chunk(
+ content="Updating.",
+ tool_calls=[
+ _search_replace_tool_call(
+ str(target),
+ " pass",
+ ' print("hello")',
+ call_id="call_2",
+ )
+ ],
+ )
+ ],
+ [mock_llm_chunk(content="Updated.")],
+ ])
+ agent_loop = _make_agent_loop(backend)
+
+ await _act_and_collect(agent_loop, "create app.py")
+ assert "pass" in target.read_text()
+
+ await _act_and_collect(agent_loop, "update app.py")
+ assert 'print("hello")' in target.read_text()
+
+ rm = agent_loop.rewind_manager
+ rewindable = rm.get_rewindable_messages()
+ assert len(rewindable) == 2
+
+ await rm.rewind_to_message(rewindable[1][0], restore_files=True)
+ assert target.read_text() == "def main():\n pass\n"
+
+ async def test_rewind_without_restore_keeps_files(
+ self, tmp_working_directory: Path
+ ) -> None:
+ """Rewind with restore_files=False keeps the file as-is."""
+ target = tmp_working_directory / "data.json"
+
+ backend = FakeBackend([
+ [
+ mock_llm_chunk(
+ content="Writing.",
+ tool_calls=[_write_file_tool_call(str(target), '{"a": 1}')],
+ )
+ ],
+ [mock_llm_chunk(content="Done.")],
+ ])
+ agent_loop = _make_agent_loop(backend)
+
+ await _act_and_collect(agent_loop, "write data")
+ assert target.read_text() == '{"a": 1}'
+
+ rm = agent_loop.rewind_manager
+ rewindable = rm.get_rewindable_messages()
+ await rm.rewind_to_message(rewindable[0][0], restore_files=False)
+
+ assert target.read_text() == '{"a": 1}'
+
+ async def test_rewind_then_new_turn(self, tmp_working_directory: Path) -> None:
+ """After rewind, a new turn creates fresh checkpoints that work correctly."""
+ target = tmp_working_directory / "code.py"
+
+ backend = FakeBackend([
+ # Turn 1
+ [
+ mock_llm_chunk(
+ content="v1.", tool_calls=[_write_file_tool_call(str(target), "v1")]
+ )
+ ],
+ [mock_llm_chunk(content="ok")],
+ # Turn 2
+ [
+ mock_llm_chunk(
+ content="v2.",
+ tool_calls=[
+ _write_file_tool_call(
+ str(target), "v2", call_id="call_2", overwrite=True
+ )
+ ],
+ )
+ ],
+ [mock_llm_chunk(content="ok")],
+ # Turn 3 (after rewind, new turn)
+ [
+ mock_llm_chunk(
+ content="v2bis.",
+ tool_calls=[
+ _write_file_tool_call(
+ str(target), "v2bis", call_id="call_3", overwrite=True
+ )
+ ],
+ )
+ ],
+ [mock_llm_chunk(content="ok")],
+ ])
+ agent_loop = _make_agent_loop(backend)
+
+ await _act_and_collect(agent_loop, "turn1")
+ await _act_and_collect(agent_loop, "turn2")
+ assert target.read_text() == "v2"
+
+ rm = agent_loop.rewind_manager
+ rewindable = rm.get_rewindable_messages()
+ await rm.rewind_to_message(rewindable[1][0], restore_files=True)
+ assert target.read_text() == "v1"
+
+ await _act_and_collect(agent_loop, "turn2bis")
+ assert target.read_text() == "v2bis"
+
+ rewindable = rm.get_rewindable_messages()
+ await rm.rewind_to_message(rewindable[1][0], restore_files=True)
+ assert target.read_text() == "v1"
+
+ async def test_rewind_restores_file_deleted_by_bash(
+ self, tmp_working_directory: Path
+ ) -> None:
+ """A tracked file deleted via bash in turn 2 is restored on rewind.
+
+ Turn 1: create the file with write_file (file becomes tracked).
+ Turn 2: delete it with bash (bash has no snapshot, but
+ create_checkpoint re-reads known files).
+ Rewind to turn 2 → file restored to its turn-1 content.
+ """
+ target = tmp_working_directory / "important.txt"
+
+ backend = FakeBackend([
+ # Turn 1: create the file
+ [
+ mock_llm_chunk(
+ content="Creating.",
+ tool_calls=[_write_file_tool_call(str(target), "precious data")],
+ )
+ ],
+ [mock_llm_chunk(content="Created.")],
+ # Turn 2: delete it via bash
+ [
+ mock_llm_chunk(
+ content="Deleting.",
+ tool_calls=[_bash_tool_call(f"rm {target}", call_id="call_2")],
+ )
+ ],
+ [mock_llm_chunk(content="Deleted.")],
+ ])
+ agent_loop = _make_agent_loop(backend)
+
+ await _act_and_collect(agent_loop, "create file")
+ assert target.read_text() == "precious data"
+
+ await _act_and_collect(agent_loop, "delete file")
+ assert not target.exists()
+
+ rm = agent_loop.rewind_manager
+ rewindable = rm.get_rewindable_messages()
+ assert len(rewindable) == 2
+
+ # Rewind to turn 2 → restores the file to its state before turn 2
+ await rm.rewind_to_message(rewindable[1][0], restore_files=True)
+ assert target.exists()
+ assert target.read_text() == "precious data"
diff --git a/tests/core/test_rewind_manager.py b/tests/core/test_rewind_manager.py
new file mode 100644
index 0000000..4fed440
--- /dev/null
+++ b/tests/core/test_rewind_manager.py
@@ -0,0 +1,594 @@
+from __future__ import annotations
+
+from pathlib import Path
+from unittest.mock import patch
+
+import pytest
+
+from vibe.core.rewind.manager import FileSnapshot, RewindError, RewindManager
+from vibe.core.types import LLMMessage, MessageList, Role
+
+
+def _make_messages(*contents: str) -> MessageList:
+ """Create a MessageList with a system message followed by user/assistant pairs."""
+ msgs = MessageList([LLMMessage(role=Role.system, content="system")])
+ for content in contents:
+ msgs.append(LLMMessage(role=Role.user, content=content))
+ msgs.append(LLMMessage(role=Role.assistant, content=f"reply to {content}"))
+ return msgs
+
+
+def _snap(path: Path) -> FileSnapshot:
+ """Create a FileSnapshot by reading a file (or None if missing)."""
+ resolved = str(path.resolve())
+ try:
+ content: bytes | None = path.read_bytes()
+ except FileNotFoundError:
+ content = None
+ return FileSnapshot(path=resolved, content=content)
+
+
+def _make_manager(
+ messages: MessageList,
+) -> tuple[RewindManager, list[bool], list[bool]]:
+ save_calls: list[bool] = []
+ reset_calls: list[bool] = []
+
+ async def save_messages() -> None:
+ save_calls.append(True)
+
+ def reset_session() -> None:
+ reset_calls.append(True)
+
+ mgr = RewindManager(
+ messages=messages, save_messages=save_messages, reset_session=reset_session
+ )
+ return mgr, save_calls, reset_calls
+
+
+class TestCheckpoints:
+ def test_create_checkpoint_carries_forward_snapshots(self, tmp_path: Path) -> None:
+ messages = _make_messages("hello", "world")
+ mgr, _, _ = _make_manager(messages)
+ f = tmp_path / "f.txt"
+ f.write_text("v1", encoding="utf-8")
+
+ mgr.create_checkpoint()
+ mgr.add_snapshot(_snap(f))
+ f.write_text("v2", encoding="utf-8")
+
+ mgr.create_checkpoint()
+
+ # Second checkpoint should have re-read the file
+ assert len(mgr.checkpoints) == 2
+ assert mgr.checkpoints[1].files[0].content == b"v2"
+
+ def test_add_snapshot_to_all_checkpoints(self, tmp_path: Path) -> None:
+ messages = _make_messages("hello", "world")
+ mgr, _, _ = _make_manager(messages)
+
+ mgr.create_checkpoint()
+ mgr.create_checkpoint()
+
+ f = tmp_path / "late.txt"
+ f.write_text("content", encoding="utf-8")
+ mgr.add_snapshot(_snap(f))
+
+ resolved = str(f.resolve())
+ assert any(s.path == resolved for s in mgr.checkpoints[0].files)
+ assert any(s.path == resolved for s in mgr.checkpoints[1].files)
+
+ def test_add_snapshot_no_duplicate(self, tmp_path: Path) -> None:
+ messages = _make_messages("hello")
+ mgr, _, _ = _make_manager(messages)
+ mgr.create_checkpoint()
+
+ f = tmp_path / "f.txt"
+ f.write_text("content", encoding="utf-8")
+ mgr.add_snapshot(_snap(f))
+ mgr.add_snapshot(_snap(f))
+
+ resolved = str(f.resolve())
+ matches = [s for s in mgr.checkpoints[0].files if s.path == resolved]
+ assert len(matches) == 1
+
+ def test_has_changes_detects_new_file(self, tmp_path: Path) -> None:
+ messages = _make_messages("hello")
+ mgr, _, _ = _make_manager(messages)
+ f = tmp_path / "new.txt"
+
+ mgr.create_checkpoint()
+ mgr.add_snapshot(FileSnapshot(path=str(f.resolve()), content=None))
+ assert not mgr.has_file_changes_at(len(messages))
+
+ f.write_text("created", encoding="utf-8")
+ assert mgr.has_file_changes_at(len(messages))
+
+ def test_has_changes_false_when_unchanged(self, tmp_path: Path) -> None:
+ messages = _make_messages("hello")
+ mgr, _, _ = _make_manager(messages)
+ f = tmp_path / "f.txt"
+ f.write_text("content", encoding="utf-8")
+
+ mgr.create_checkpoint()
+ mgr.add_snapshot(_snap(f))
+ assert not mgr.has_file_changes_at(len(messages))
+
+ def test_has_file_changes_at_no_checkpoint(self) -> None:
+ messages = _make_messages("hello")
+ mgr, _, _ = _make_manager(messages)
+ assert not mgr.has_file_changes_at(1)
+
+
+class TestRewind:
+ def test_get_rewindable_messages(self) -> None:
+ messages = _make_messages("hello", "world")
+ mgr, _, _ = _make_manager(messages)
+
+ result = mgr.get_rewindable_messages()
+
+ assert len(result) == 2
+ assert result[0] == (1, "hello")
+ assert result[1] == (3, "world")
+
+ def test_get_rewindable_messages_excludes_injected(self) -> None:
+ messages = _make_messages("hello")
+ # Insert an injected middleware message between turns
+ messages.append(
+ LLMMessage(role=Role.user, content="plan mode reminder", injected=True)
+ )
+ messages.append(LLMMessage(role=Role.user, content="world"))
+ messages.append(LLMMessage(role=Role.assistant, content="reply to world"))
+ mgr, _, _ = _make_manager(messages)
+
+ result = mgr.get_rewindable_messages()
+
+ assert len(result) == 2
+ assert result[0] == (1, "hello")
+ # Index 3 is the injected message — it must be skipped
+ assert result[1] == (4, "world")
+
+ @pytest.mark.asyncio
+ async def test_rewind_to_message(self) -> None:
+ messages = _make_messages("hello", "world")
+ mgr, save_calls, reset_calls = _make_manager(messages)
+
+ content, errors = await mgr.rewind_to_message(3, restore_files=False)
+
+ assert content == "world"
+ assert errors == []
+ assert len(save_calls) == 1
+ assert len(reset_calls) == 1
+ assert len(messages) == 3
+
+ @pytest.mark.asyncio
+ async def test_rewind_to_message_invalid_index(self) -> None:
+ messages = _make_messages("hello")
+ mgr, _, _ = _make_manager(messages)
+
+ with pytest.raises(RewindError, match="Invalid message index"):
+ await mgr.rewind_to_message(99, restore_files=False)
+
+ @pytest.mark.asyncio
+ async def test_rewind_to_message_not_user(self) -> None:
+ messages = _make_messages("hello")
+ mgr, _, _ = _make_manager(messages)
+
+ with pytest.raises(RewindError, match="not a user message"):
+ await mgr.rewind_to_message(2, restore_files=False)
+
+ def test_messages_reset_clears_checkpoints(self) -> None:
+ messages = _make_messages("hello")
+ mgr, _, _ = _make_manager(messages)
+ mgr.create_checkpoint()
+
+ assert len(mgr.checkpoints) == 1
+
+ messages.reset([LLMMessage(role=Role.system, content="system")])
+
+ assert len(mgr.checkpoints) == 0
+
+ @pytest.mark.asyncio
+ async def test_rewind_preserves_earlier_checkpoints(self) -> None:
+ messages = _make_messages("hello", "world")
+ mgr, _, _ = _make_manager(messages)
+
+ mgr.create_checkpoint()
+ mgr._checkpoints[-1].message_index = 1
+ mgr.create_checkpoint()
+ mgr._checkpoints[-1].message_index = 3
+
+ assert len(mgr.checkpoints) == 2
+
+ await mgr.rewind_to_message(3, restore_files=False)
+
+ assert len(mgr.checkpoints) == 1
+
+ def test_update_system_prompt_preserves_checkpoints(self) -> None:
+ """Switching agents via shift+tab calls update_system_prompt which must
+ NOT clear rewind checkpoints (unlike a full reset).
+ """
+ messages = _make_messages("hello", "world")
+ mgr, _, _ = _make_manager(messages)
+
+ mgr.create_checkpoint()
+ mgr._checkpoints[-1].message_index = 1
+ mgr.create_checkpoint()
+ mgr._checkpoints[-1].message_index = 3
+
+ assert len(mgr.checkpoints) == 2
+
+ # Simulate shift+tab agent switch: only the system prompt changes
+ messages.update_system_prompt("new agent system prompt")
+
+ assert len(mgr.checkpoints) == 2
+ assert messages[0].content == "new agent system prompt"
+
+ def test_create_checkpoint_uses_current_message_count(self) -> None:
+ messages = _make_messages("hello")
+ mgr, _, _ = _make_manager(messages)
+
+ mgr.create_checkpoint()
+
+ assert mgr.checkpoints[0].message_index == len(messages)
+
+
+class _Turn:
+ """Helper that simulates one conversation turn."""
+
+ def __init__(self, mgr: RewindManager, messages: MessageList) -> None:
+ self._mgr = mgr
+ self._messages = messages
+
+ def begin(self, user_msg: str) -> None:
+ """Start a new turn: create checkpoint then append user message.
+
+ This mirrors agent_loop.act() which calls create_checkpoint()
+ *before* the user message is added to the message list.
+ """
+ self._mgr.create_checkpoint()
+ self._messages.append(LLMMessage(role=Role.user, content=user_msg))
+
+ def tool_write(self, path: Path, content: str) -> None:
+ """Simulate a tool writing to a file (snapshot → write)."""
+ self._mgr.add_snapshot(_snap(path))
+ path.parent.mkdir(parents=True, exist_ok=True)
+ path.write_text(content, encoding="utf-8")
+
+ def tool_delete(self, path: Path) -> None:
+ """Simulate a tool deleting a file (snapshot → unlink)."""
+ self._mgr.add_snapshot(_snap(path))
+ path.unlink()
+
+ def end(self, assistant_reply: str = "ok") -> None:
+ """End the turn: append assistant reply."""
+ self._messages.append(LLMMessage(role=Role.assistant, content=assistant_reply))
+
+
+@pytest.mark.asyncio
+class TestRewindScenarios:
+ @staticmethod
+ def _setup() -> tuple[RewindManager, MessageList, _Turn]:
+ messages = MessageList([LLMMessage(role=Role.system, content="system")])
+ mgr, _, _ = _make_manager(messages)
+ return mgr, messages, _Turn(mgr, messages)
+
+ async def test_edit_file_across_turns_rewind_to_middle(
+ self, tmp_path: Path
+ ) -> None:
+ """A file is edited every turn. Rewinding restores the version from
+ *before* the target turn.
+ """
+ mgr, messages, turn = self._setup()
+ f = tmp_path / "app.py"
+ f.write_text("v0", encoding="utf-8")
+
+ turn.begin("turn1")
+ turn.tool_write(f, "v1")
+ turn.end()
+
+ turn.begin("turn2")
+ turn.tool_write(f, "v2")
+ turn.end()
+
+ turn.begin("turn3")
+ turn.tool_write(f, "v3")
+ turn.end()
+
+ # Rewind to turn2 (message index 3) → file should be v1
+ rewindable = mgr.get_rewindable_messages()
+ turn2_idx = rewindable[1][0]
+ await mgr.rewind_to_message(turn2_idx, restore_files=True)
+
+ assert f.read_text(encoding="utf-8") == "v1"
+
+ async def test_file_created_then_rewind_before_creation(
+ self, tmp_path: Path
+ ) -> None:
+ """A file that didn't exist before should be deleted on rewind."""
+ mgr, messages, turn = self._setup()
+ new_file = tmp_path / "generated.py"
+
+ turn.begin("turn1")
+ turn.end()
+
+ turn.begin("turn2")
+ turn.tool_write(new_file, "print('hello')")
+ turn.end()
+
+ turn1_idx = mgr.get_rewindable_messages()[0][0]
+ await mgr.rewind_to_message(turn1_idx, restore_files=True)
+
+ assert not new_file.exists()
+
+ async def test_file_deleted_by_tool_rewind_restores(self, tmp_path: Path) -> None:
+ """Rewinding past a deletion restores the file."""
+ mgr, _, turn = self._setup()
+ f = tmp_path / "config.yaml"
+ f.write_text("key: value", encoding="utf-8")
+
+ turn.begin("turn1")
+ turn.tool_write(f, "key: value") # touch to start tracking
+ turn.end()
+
+ turn.begin("turn2")
+ turn.tool_delete(f)
+ turn.end()
+
+ assert not f.exists()
+
+ turn2_idx = mgr.get_rewindable_messages()[1][0]
+ await mgr.rewind_to_message(turn2_idx, restore_files=True)
+
+ assert f.exists()
+ assert f.read_text(encoding="utf-8") == "key: value"
+
+ async def test_mixed_create_and_edit(self, tmp_path: Path) -> None:
+ """Multiple files: one pre-existing and edited, one created mid-session."""
+ mgr, messages, turn = self._setup()
+ existing = tmp_path / "main.py"
+ existing.write_text("original", encoding="utf-8")
+
+ turn.begin("turn1")
+ turn.tool_write(existing, "modified")
+ turn.end()
+
+ turn.begin("turn2")
+ new_file = tmp_path / "utils.py"
+ turn.tool_write(new_file, "def helper(): ...")
+ turn.tool_write(existing, "modified again")
+ turn.end()
+
+ turn1_idx = mgr.get_rewindable_messages()[0][0]
+ await mgr.rewind_to_message(turn1_idx, restore_files=True)
+
+ assert existing.read_text(encoding="utf-8") == "original"
+ assert not new_file.exists()
+
+ async def test_user_manual_edit_between_turns(self, tmp_path: Path) -> None:
+ """If the user edits a file between turns, the checkpoint at the next
+ turn captures the user's version, so rewind restores it.
+ """
+ mgr, _, turn = self._setup()
+ f = tmp_path / "readme.md"
+ f.write_text("initial", encoding="utf-8")
+
+ turn.begin("turn1")
+ turn.tool_write(f, "tool wrote this")
+ turn.end()
+
+ # User manually edits the file outside the tool loop
+ f.write_text("user edited this", encoding="utf-8")
+
+ turn.begin("turn2")
+ turn.tool_write(f, "tool overwrote user")
+ turn.end()
+
+ # Rewind to turn2 → should restore the user's manual edit
+ turn2_idx = mgr.get_rewindable_messages()[1][0]
+ await mgr.rewind_to_message(turn2_idx, restore_files=True)
+
+ assert f.read_text(encoding="utf-8") == "user edited this"
+
+ async def test_rewind_without_restore(self, tmp_path: Path) -> None:
+ """Rewinding with restore_files=False truncates messages but keeps
+ files as they are.
+ """
+ mgr, messages, turn = self._setup()
+ f = tmp_path / "data.json"
+ f.write_text("{}", encoding="utf-8")
+
+ turn.begin("turn1")
+ turn.tool_write(f, '{"a": 1}')
+ turn.end()
+
+ turn.begin("turn2")
+ turn.tool_write(f, '{"a": 1, "b": 2}')
+ turn.end()
+
+ turn1_idx = mgr.get_rewindable_messages()[0][0]
+ await mgr.rewind_to_message(turn1_idx, restore_files=False)
+
+ # File untouched
+ assert f.read_text(encoding="utf-8") == '{"a": 1, "b": 2}'
+ # But messages were truncated
+ assert len(messages) == 1 # only system message
+
+ async def test_rewind_then_new_turns_then_rewind_again(
+ self, tmp_path: Path
+ ) -> None:
+ """After a rewind, new turns create new checkpoints. A second rewind
+ should work correctly with the new history.
+ """
+ mgr, _, turn = self._setup()
+ f = tmp_path / "code.py"
+ f.write_text("v0", encoding="utf-8")
+
+ turn.begin("turn1")
+ turn.tool_write(f, "v1")
+ turn.end()
+
+ turn.begin("turn2")
+ turn.tool_write(f, "v2")
+ turn.end()
+
+ # Rewind to turn2
+ turn2_idx = mgr.get_rewindable_messages()[1][0]
+ await mgr.rewind_to_message(turn2_idx, restore_files=True)
+ assert f.read_text(encoding="utf-8") == "v1"
+
+ # New turn after rewind
+ turn.begin("turn2-bis")
+ turn.tool_write(f, "v2-bis")
+ turn.end()
+
+ turn.begin("turn3-bis")
+ turn.tool_write(f, "v3-bis")
+ turn.end()
+
+ # Rewind to turn2-bis
+ turn2bis_idx = mgr.get_rewindable_messages()[1][0]
+ await mgr.rewind_to_message(turn2bis_idx, restore_files=True)
+ assert f.read_text(encoding="utf-8") == "v1"
+
+ async def test_agent_switch_between_turns_preserves_rewind(
+ self, tmp_path: Path
+ ) -> None:
+ """Pressing shift+tab between two messages switches agents, which calls
+ update_system_prompt. Checkpoints must survive so a subsequent rewind
+ restores files correctly.
+ """
+ mgr, messages, turn = self._setup()
+ f = tmp_path / "main.py"
+ f.write_text("v0", encoding="utf-8")
+
+ turn.begin("turn1")
+ turn.tool_write(f, "v1")
+ turn.end()
+
+ # User presses shift+tab → agent switch → system prompt replaced
+ messages.update_system_prompt("switched agent prompt")
+
+ turn.begin("turn2")
+ turn.tool_write(f, "v2")
+ turn.end()
+
+ # Rewind to turn2 should restore "v1"
+ turn2_idx = mgr.get_rewindable_messages()[1][0]
+ await mgr.rewind_to_message(turn2_idx, restore_files=True)
+
+ assert f.read_text(encoding="utf-8") == "v1"
+
+ async def test_binary_file_snapshot_and_restore(self, tmp_path: Path) -> None:
+ """Binary files (non-UTF-8) are snapshotted and restored correctly."""
+ mgr, _, turn = self._setup()
+ f = tmp_path / "image.bin"
+ original = bytes(range(256))
+ f.write_bytes(original)
+
+ turn.begin("turn1")
+ mgr.add_snapshot(_snap(f))
+ f.write_bytes(b"\x00" * 256)
+ turn.end()
+
+ turn1_idx = mgr.get_rewindable_messages()[0][0]
+ await mgr.rewind_to_message(turn1_idx, restore_files=True)
+
+ assert f.read_bytes() == original
+
+ async def test_create_edit_delete_full_lifecycle(self, tmp_path: Path) -> None:
+ """File goes through create → edit → delete. Rewind to each point
+ restores the correct state.
+ """
+ mgr, _, turn = self._setup()
+ f = tmp_path / "temp.txt"
+
+ turn.begin("turn1")
+ turn.tool_write(f, "created")
+ turn.end()
+
+ turn.begin("turn2")
+ turn.tool_write(f, "edited")
+ turn.end()
+
+ turn.begin("turn3")
+ turn.tool_delete(f)
+ turn.end()
+
+ assert not f.exists()
+
+ # Rewind to turn3 → file should be "edited" (state before deletion)
+ turn3_idx = mgr.get_rewindable_messages()[2][0]
+ await mgr.rewind_to_message(turn3_idx, restore_files=True)
+ assert f.read_text(encoding="utf-8") == "edited"
+
+ async def test_user_creates_file_tool_overwrites(self, tmp_path: Path) -> None:
+ """User creates a file manually before a turn. The tool overwrites it.
+ Rewind restores the user's version.
+ """
+ mgr, _, turn = self._setup()
+ f = tmp_path / "notes.txt"
+
+ turn.begin("turn1")
+ turn.end()
+
+ # User creates the file manually between turns
+ f.write_text("user notes", encoding="utf-8")
+
+ turn.begin("turn2")
+ turn.tool_write(f, "overwritten by tool")
+ turn.end()
+
+ turn2_idx = mgr.get_rewindable_messages()[1][0]
+ await mgr.rewind_to_message(turn2_idx, restore_files=True)
+
+ assert f.read_text(encoding="utf-8") == "user notes"
+
+ async def test_nested_directory_files(self, tmp_path: Path) -> None:
+ """Files in nested directories are restored including parent dirs."""
+ mgr, _, turn = self._setup()
+ deep = tmp_path / "src" / "pkg" / "module.py"
+
+ turn.begin("turn1")
+ turn.tool_write(deep, "def foo(): pass")
+ turn.end()
+
+ turn.begin("turn2")
+ turn.tool_write(deep, "def foo(): return 42")
+ turn.end()
+
+ turn1_idx = mgr.get_rewindable_messages()[0][0]
+
+ # Delete everything
+ deep.unlink()
+ (tmp_path / "src" / "pkg").rmdir()
+ (tmp_path / "src").rmdir()
+
+ await mgr.rewind_to_message(turn1_idx, restore_files=True)
+
+ # File didn't exist before turn1 → should be deleted
+ assert not deep.exists()
+
+ async def test_rewind_restores_errors_collected(self, tmp_path: Path) -> None:
+ """When removing a file during rewind fails, errors are returned in the tuple."""
+ mgr, _, turn = self._setup()
+ created_file = tmp_path / "locked.txt"
+
+ turn.begin("turn1")
+ turn.end()
+
+ turn.begin("turn2")
+ # Snapshot runs before write → earlier checkpoints record content=None
+ turn.tool_write(created_file, "created in turn2")
+ turn.end()
+
+ turn1_idx = mgr.get_rewindable_messages()[0][0]
+ with patch(
+ "vibe.core.rewind.manager.os.remove",
+ side_effect=OSError("mocked removal failure"),
+ ):
+ _, errors = await mgr.rewind_to_message(turn1_idx, restore_files=True)
+
+ assert len(errors) == 1
+ assert "Failed to delete file" in errors[0]
+ assert "locked.txt" in errors[0]
diff --git a/tests/snapshots/__snapshots__/test_ui_snapshot_parallel_tool_calls/test_snapshot_parallel_tool_calls_resolved.svg b/tests/snapshots/__snapshots__/test_ui_snapshot_parallel_tool_calls/test_snapshot_parallel_tool_calls_resolved.svg
index b465dda..209e55e 100644
--- a/tests/snapshots/__snapshots__/test_ui_snapshot_parallel_tool_calls/test_snapshot_parallel_tool_calls_resolved.svg
+++ b/tests/snapshots/__snapshots__/test_ui_snapshot_parallel_tool_calls/test_snapshot_parallel_tool_calls_resolved.svg
@@ -110,28 +110,28 @@
-
+
✓Read 1 line from file_0.py
⎢Path: /src/file_0.py
⎢
-⎢
-⎢# content of file_0.py
-⎣
-✓Read 1 line from file_1.py
-⎢Path: /src/file_1.py
-⎢
-⎢
-⎢# content of file_1.py
-⎣
-✓Read 1 line from file_2.py
-⎢Path: /src/file_2.py
-⎢
-⎢
-⎢# content of file_2.py
-⎣
+⎣# content of file_0.py
+✓Read 1 line from file_1.py
+⎢Path: /src/file_1.py
+⎢
+⎣# content of file_1.py
+✓Read 1 line from file_2.py
+⎢Path: /src/file_2.py
+⎢
+⎣# content of file_2.py
+
+
+
+
+
+
diff --git a/tests/snapshots/__snapshots__/test_ui_snapshot_rewind/test_snapshot_rewind_error_shows_toast.svg b/tests/snapshots/__snapshots__/test_ui_snapshot_rewind/test_snapshot_rewind_error_shows_toast.svg
new file mode 100644
index 0000000..1bd6ce6
--- /dev/null
+++ b/tests/snapshots/__snapshots__/test_ui_snapshot_rewind/test_snapshot_rewind_error_shows_toast.svg
@@ -0,0 +1,204 @@
+
diff --git a/tests/snapshots/__snapshots__/test_ui_snapshot_rewind/test_snapshot_rewind_exit_on_escape.svg b/tests/snapshots/__snapshots__/test_ui_snapshot_rewind/test_snapshot_rewind_exit_on_escape.svg
new file mode 100644
index 0000000..7955200
--- /dev/null
+++ b/tests/snapshots/__snapshots__/test_ui_snapshot_rewind/test_snapshot_rewind_exit_on_escape.svg
@@ -0,0 +1,201 @@
+
diff --git a/tests/snapshots/__snapshots__/test_ui_snapshot_rewind/test_snapshot_rewind_navigate_down.svg b/tests/snapshots/__snapshots__/test_ui_snapshot_rewind/test_snapshot_rewind_navigate_down.svg
new file mode 100644
index 0000000..4914668
--- /dev/null
+++ b/tests/snapshots/__snapshots__/test_ui_snapshot_rewind/test_snapshot_rewind_navigate_down.svg
@@ -0,0 +1,203 @@
+
diff --git a/tests/snapshots/__snapshots__/test_ui_snapshot_rewind/test_snapshot_rewind_navigate_up.svg b/tests/snapshots/__snapshots__/test_ui_snapshot_rewind/test_snapshot_rewind_navigate_up.svg
new file mode 100644
index 0000000..51266ac
--- /dev/null
+++ b/tests/snapshots/__snapshots__/test_ui_snapshot_rewind/test_snapshot_rewind_navigate_up.svg
@@ -0,0 +1,203 @@
+
diff --git a/tests/snapshots/__snapshots__/test_ui_snapshot_rewind/test_snapshot_rewind_panel_shown.svg b/tests/snapshots/__snapshots__/test_ui_snapshot_rewind/test_snapshot_rewind_panel_shown.svg
new file mode 100644
index 0000000..4914668
--- /dev/null
+++ b/tests/snapshots/__snapshots__/test_ui_snapshot_rewind/test_snapshot_rewind_panel_shown.svg
@@ -0,0 +1,203 @@
+
diff --git a/tests/snapshots/__snapshots__/test_ui_snapshot_session_resume_injected/test_snapshot_session_resume_hides_injected_messages.svg b/tests/snapshots/__snapshots__/test_ui_snapshot_session_resume_injected/test_snapshot_session_resume_hides_injected_messages.svg
new file mode 100644
index 0000000..8f5f1f7
--- /dev/null
+++ b/tests/snapshots/__snapshots__/test_ui_snapshot_session_resume_injected/test_snapshot_session_resume_hides_injected_messages.svg
@@ -0,0 +1,201 @@
+
diff --git a/tests/snapshots/conftest.py b/tests/snapshots/conftest.py
index 45f941a..c1f9696 100644
--- a/tests/snapshots/conftest.py
+++ b/tests/snapshots/conftest.py
@@ -8,3 +8,8 @@ def _pin_banner_version(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(
"vibe.cli.textual_ui.widgets.banner.banner.__version__", "0.0.0"
)
+
+
+@pytest.fixture(autouse=True)
+def _pin_alt_key(monkeypatch: pytest.MonkeyPatch) -> None:
+ monkeypatch.setattr("vibe.cli.textual_ui.widgets.rewind_app.ALT_KEY", "Alt")
diff --git a/tests/snapshots/test_ui_snapshot_rewind.py b/tests/snapshots/test_ui_snapshot_rewind.py
new file mode 100644
index 0000000..83d723c
--- /dev/null
+++ b/tests/snapshots/test_ui_snapshot_rewind.py
@@ -0,0 +1,142 @@
+from __future__ import annotations
+
+from unittest.mock import patch
+
+import pytest
+from textual.pilot import Pilot
+
+from tests.mock.utils import mock_llm_chunk
+from tests.snapshots.base_snapshot_test_app import BaseSnapshotTestApp
+from tests.snapshots.snap_compare import SnapCompare
+from tests.stubs.fake_backend import FakeBackend
+from vibe.core.rewind import RewindError
+
+
+class RewindSnapshotApp(BaseSnapshotTestApp):
+ """Test app with a multi-turn conversation for rewind snapshots."""
+
+ def __init__(self) -> None:
+ fake_backend = FakeBackend([
+ mock_llm_chunk(content="Hello! How can I help you?")
+ ])
+ super().__init__(backend=fake_backend)
+
+
+async def _send_messages(pilot: Pilot) -> None:
+ """Send three messages to build up conversation history.
+
+ Also patches ``has_file_changes_at`` to always return True so the
+ rewind panel shows the "restore files" option.
+ """
+ for msg in ["first message", "second message", "third message"]:
+ await pilot.press(*msg)
+ await pilot.press("enter")
+ await pilot.pause(0.4)
+
+ app: RewindSnapshotApp = pilot.app # type: ignore[assignment]
+ rm = app.agent_loop.rewind_manager
+ patch.object(rm, "has_file_changes_at", return_value=True).start()
+
+
+def test_snapshot_rewind_panel_shown(snap_compare: SnapCompare) -> None:
+ """Pressing alt+up enters rewind mode and shows the panel."""
+
+ async def run_before(pilot: Pilot) -> None:
+ await _send_messages(pilot)
+ await pilot.press("alt+up")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.2)
+
+ assert snap_compare(
+ "test_ui_snapshot_rewind.py:RewindSnapshotApp",
+ terminal_size=(120, 36),
+ run_before=run_before,
+ )
+
+
+def test_snapshot_rewind_navigate_up(snap_compare: SnapCompare) -> None:
+ """Pressing alt+up twice selects the second-to-last message."""
+
+ async def run_before(pilot: Pilot) -> None:
+ await _send_messages(pilot)
+ await pilot.press("alt+up")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.2)
+ await pilot.press("alt+up")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.2)
+
+ assert snap_compare(
+ "test_ui_snapshot_rewind.py:RewindSnapshotApp",
+ terminal_size=(120, 36),
+ run_before=run_before,
+ )
+
+
+def test_snapshot_rewind_navigate_down(snap_compare: SnapCompare) -> None:
+ """Navigate up twice then down once returns to the last message."""
+
+ async def run_before(pilot: Pilot) -> None:
+ await _send_messages(pilot)
+ await pilot.press("alt+up")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.2)
+ await pilot.press("alt+up")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.2)
+ await pilot.press("alt+down")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.2)
+
+ assert snap_compare(
+ "test_ui_snapshot_rewind.py:RewindSnapshotApp",
+ terminal_size=(120, 36),
+ run_before=run_before,
+ )
+
+
+def test_snapshot_rewind_exit_on_escape(snap_compare: SnapCompare) -> None:
+ """Pressing escape exits rewind mode and restores the input panel."""
+
+ async def run_before(pilot: Pilot) -> None:
+ await _send_messages(pilot)
+ await pilot.press("alt+up")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.2)
+ await pilot.press("escape")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.2)
+
+ assert snap_compare(
+ "test_ui_snapshot_rewind.py:RewindSnapshotApp",
+ terminal_size=(120, 36),
+ run_before=run_before,
+ )
+
+
+def test_snapshot_rewind_error_shows_toast(
+ snap_compare: SnapCompare, monkeypatch: pytest.MonkeyPatch
+) -> None:
+ """When rewind_to_message fails, a toast is shown and rewind mode stays active."""
+
+ async def failing_rewind(*_args, **_kwargs):
+ raise RewindError("Invalid message index: 99")
+
+ async def run_before(pilot: Pilot) -> None:
+ await _send_messages(pilot)
+ app: RewindSnapshotApp = pilot.app # type: ignore[assignment]
+ monkeypatch.setattr(
+ app.agent_loop.rewind_manager, "rewind_to_message", failing_rewind
+ )
+ await pilot.press("alt+up")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.2)
+ await pilot.press("enter")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.3)
+
+ assert snap_compare(
+ "test_ui_snapshot_rewind.py:RewindSnapshotApp",
+ terminal_size=(120, 36),
+ run_before=run_before,
+ )
diff --git a/tests/snapshots/test_ui_snapshot_session_resume_injected.py b/tests/snapshots/test_ui_snapshot_session_resume_injected.py
new file mode 100644
index 0000000..089b64a
--- /dev/null
+++ b/tests/snapshots/test_ui_snapshot_session_resume_injected.py
@@ -0,0 +1,47 @@
+from __future__ import annotations
+
+from textual.pilot import Pilot
+
+from tests.snapshots.base_snapshot_test_app import BaseSnapshotTestApp
+from tests.snapshots.snap_compare import SnapCompare
+from vibe.core.types import LLMMessage, Role
+
+
+class SnapshotTestAppWithInjectedMessages(BaseSnapshotTestApp):
+ """Simulates resuming a session that contains injected middleware messages.
+
+ The injected plan-mode reminder between the two user turns must not
+ appear in the rendered history.
+ """
+
+ def __init__(self) -> None:
+ super().__init__()
+ self.agent_loop.messages.extend([
+ LLMMessage(role=Role.user, content="Hello, can you help me?"),
+ LLMMessage(role=Role.assistant, content="Sure! What do you need?"),
+ # Middleware-injected plan mode reminder — should be hidden
+ LLMMessage(
+ role=Role.user,
+ content="Plan mode is active. You MUST NOT make any edits.",
+ injected=True,
+ ),
+ LLMMessage(role=Role.user, content="Please read my config file."),
+ LLMMessage(
+ role=Role.assistant, content="Here is the content of your config file."
+ ),
+ ])
+
+
+def test_snapshot_session_resume_hides_injected_messages(
+ snap_compare: SnapCompare,
+) -> None:
+ """Injected middleware messages must not be rendered when resuming a session."""
+
+ async def run_before(pilot: Pilot) -> None:
+ await pilot.pause(0.5)
+
+ assert snap_compare(
+ "test_ui_snapshot_session_resume_injected.py:SnapshotTestAppWithInjectedMessages",
+ terminal_size=(120, 36),
+ run_before=run_before,
+ )
diff --git a/tests/test_agent_observer_streaming.py b/tests/test_agent_observer_streaming.py
index 805aead..554fed8 100644
--- a/tests/test_agent_observer_streaming.py
+++ b/tests/test_agent_observer_streaming.py
@@ -442,12 +442,16 @@ async def test_act_flushes_and_logs_when_streaming_errors(observer_capture) -> N
@pytest.mark.asyncio
async def test_rate_limit(observer_capture) -> None:
observed, observer = observer_capture
- response = httpx.Response(HTTPStatus.TOO_MANY_REQUESTS)
+ response = httpx.Response(
+ HTTPStatus.TOO_MANY_REQUESTS, request=httpx.Request("POST", "http://test")
+ )
+ error = httpx.HTTPStatusError(
+ "rate limited", request=response.request, response=response
+ )
backend_error = BackendErrorBuilder.build_http_error(
provider="mistral",
endpoint="test",
- response=response,
- headers=None,
+ error=error,
model="test-model",
messages=[],
temperature=0.0,
@@ -640,3 +644,27 @@ async def test_empty_content_chunks_do_not_trigger_false_yields() -> None:
("ReasoningEvent", " more reasoning"),
("AssistantEvent", "Actual content"),
]
+
+
+@pytest.mark.asyncio
+async def test_streaming_assistant_event_message_id_matches_stored_message() -> None:
+ backend = FakeBackend([
+ mock_llm_chunk(content="Hello"),
+ mock_llm_chunk(content=" world"),
+ ])
+ agent = build_test_agent_loop(
+ config=make_config(), backend=backend, enable_streaming=True
+ )
+
+ events = [event async for event in agent.act("Test")]
+
+ assistant_events = [e for e in events if isinstance(e, AssistantEvent)]
+ assert len(assistant_events) == 2
+
+ # All chunks of the same assistant turn share one message_id
+ message_ids = {e.message_id for e in assistant_events}
+ assert len(message_ids) == 1
+
+ # The stored LLMMessage must carry that same message_id
+ stored_msg = next(m for m in agent.messages if m.role == Role.assistant)
+ assert stored_msg.message_id == assistant_events[0].message_id
diff --git a/tests/test_ui_rewind.py b/tests/test_ui_rewind.py
new file mode 100644
index 0000000..5a86f6a
--- /dev/null
+++ b/tests/test_ui_rewind.py
@@ -0,0 +1,230 @@
+from __future__ import annotations
+
+import pytest
+
+from tests.conftest import build_test_agent_loop, build_test_vibe_app
+from tests.mock.utils import mock_llm_chunk
+from tests.stubs.fake_backend import FakeBackend
+from vibe.cli.textual_ui.app import BottomApp, VibeApp
+from vibe.cli.textual_ui.widgets.chat_input.container import ChatInputContainer
+from vibe.cli.textual_ui.widgets.messages import UserMessage
+
+
+def _make_app(num_responses: int = 3) -> VibeApp:
+ backend = FakeBackend([
+ mock_llm_chunk(content=f"Response {i + 1}") for i in range(num_responses)
+ ])
+ agent_loop = build_test_agent_loop(backend=backend)
+ return build_test_vibe_app(agent_loop=agent_loop)
+
+
+async def _send_messages(pilot, messages: list[str]) -> None:
+ for msg in messages:
+ await pilot.press(*msg)
+ await pilot.press("enter")
+ await pilot.pause(0.4)
+
+
+@pytest.mark.asyncio
+async def test_rewind_mode_activates_on_alt_up() -> None:
+ app = _make_app()
+ async with app.run_test() as pilot:
+ await _send_messages(pilot, ["hello", "world"])
+
+ await pilot.press("alt+up")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.1)
+
+ assert app._rewind_mode is True
+ assert app._current_bottom_app == BottomApp.Rewind
+
+
+@pytest.mark.asyncio
+async def test_rewind_highlights_last_user_message() -> None:
+ app = _make_app()
+ async with app.run_test() as pilot:
+ await _send_messages(pilot, ["hello", "world"])
+
+ await pilot.press("alt+up")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.1)
+
+ assert app._rewind_highlighted_widget is not None
+ assert app._rewind_highlighted_widget._content == "world"
+
+
+@pytest.mark.asyncio
+async def test_rewind_navigates_to_previous_message() -> None:
+ app = _make_app()
+ async with app.run_test() as pilot:
+ await _send_messages(pilot, ["hello", "world"])
+
+ await pilot.press("alt+up")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.1)
+ await pilot.press("alt+up")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.1)
+
+ assert app._rewind_highlighted_widget is not None
+ assert app._rewind_highlighted_widget._content == "hello"
+
+
+@pytest.mark.asyncio
+async def test_rewind_navigates_down() -> None:
+ app = _make_app()
+ async with app.run_test() as pilot:
+ await _send_messages(pilot, ["hello", "world"])
+
+ # Go up twice, then down once
+ await pilot.press("alt+up")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.1)
+ await pilot.press("alt+up")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.1)
+ await pilot.press("alt+down")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.1)
+
+ assert app._rewind_highlighted_widget is not None
+ assert app._rewind_highlighted_widget._content == "world"
+
+
+@pytest.mark.asyncio
+async def test_rewind_escape_exits_mode() -> None:
+ app = _make_app()
+ async with app.run_test() as pilot:
+ await _send_messages(pilot, ["hello", "world"])
+
+ await pilot.press("alt+up")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.1)
+
+ await pilot.press("escape")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.1)
+
+ assert app._rewind_mode is False
+ assert app._rewind_highlighted_widget is None
+ assert app._current_bottom_app == BottomApp.Input
+
+
+@pytest.mark.asyncio
+async def test_rewind_ctrl_p_n_alternate_bindings() -> None:
+ app = _make_app()
+ async with app.run_test() as pilot:
+ await _send_messages(pilot, ["hello", "world"])
+
+ # ctrl+p should enter rewind mode
+ await pilot.press("ctrl+p")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.1)
+
+ assert app._rewind_mode is True
+ assert app._rewind_highlighted_widget is not None
+ assert app._rewind_highlighted_widget._content == "world"
+
+ # ctrl+p again goes to previous
+ await pilot.press("ctrl+p")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.1)
+
+ assert app._rewind_highlighted_widget is not None
+ assert app._rewind_highlighted_widget._content == "hello"
+
+ # ctrl+n goes back
+ await pilot.press("ctrl+n")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.1)
+
+ assert app._rewind_highlighted_widget is not None
+ assert app._rewind_highlighted_widget._content == "world"
+
+
+@pytest.mark.asyncio
+async def test_rewind_confirm_edits_message_and_prefills_input() -> None:
+ app = _make_app()
+ async with app.run_test() as pilot:
+ await _send_messages(pilot, ["hello", "world"])
+
+ await pilot.press("alt+up")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.1)
+
+ # Confirm with enter (selects "Edit message from here")
+ await pilot.press("enter")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.2)
+
+ assert app._rewind_mode is False
+ assert app._current_bottom_app == BottomApp.Input
+
+ # Input should be pre-filled with the rewound message
+ chat_input = app.query_one(ChatInputContainer)
+ assert chat_input.value == "world"
+
+
+@pytest.mark.asyncio
+async def test_rewind_removes_messages_after_selected() -> None:
+ app = _make_app()
+ async with app.run_test() as pilot:
+ await _send_messages(pilot, ["first", "second", "third"])
+
+ # Navigate to "second"
+ await pilot.press("alt+up")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.1)
+ await pilot.press("alt+up")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.1)
+
+ assert app._rewind_highlighted_widget is not None
+ assert app._rewind_highlighted_widget._content == "second"
+
+ # Confirm
+ await pilot.press("enter")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.2)
+
+ # Only "first" should remain as a UserMessage
+ messages_area = app.query_one("#messages")
+ user_widgets = [
+ child for child in messages_area.children if isinstance(child, UserMessage)
+ ]
+ assert len(user_widgets) == 1
+ assert user_widgets[0]._content == "first"
+
+
+@pytest.mark.asyncio
+async def test_rewind_does_not_activate_while_agent_running() -> None:
+ app = _make_app()
+ async with app.run_test() as pilot:
+ await _send_messages(pilot, ["hello"])
+
+ app._agent_running = True
+
+ await pilot.press("alt+up")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.1)
+
+ assert app._rewind_mode is False
+
+
+@pytest.mark.asyncio
+async def test_rewind_option_selection_with_number_keys() -> None:
+ app = _make_app()
+ async with app.run_test() as pilot:
+ await _send_messages(pilot, ["hello"])
+
+ await pilot.press("alt+up")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.1)
+
+ # Press "1" to select first option directly
+ await pilot.press("1")
+ await pilot.app.workers.wait_for_complete()
+ await pilot.pause(0.2)
+
+ assert app._rewind_mode is False
+ assert app._current_bottom_app == BottomApp.Input
diff --git a/uv.lock b/uv.lock
index ebd7f17..493ea9b 100644
--- a/uv.lock
+++ b/uv.lock
@@ -561,6 +561,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/62/a1/3d680cbfd5f4b8f15abc1d571870c5fc3e594bb582bc3b64ea099db13e56/jinja2-3.1.6-py3-none-any.whl", hash = "sha256:85ece4451f492d0c13c5dd7c13a64681a86afae63a5f347908daf103ce6d2f67", size = 134899, upload-time = "2025-03-05T20:05:00.369Z" },
]
+[[package]]
+name = "jsonpath-python"
+version = "1.1.5"
+source = { registry = "https://pypi.org/simple" }
+sdist = { url = "https://files.pythonhosted.org/packages/2d/db/2f4ecc24da35c6142b39c353d5b7c16eef955cc94b35a48d3fa47996d7c3/jsonpath_python-1.1.5.tar.gz", hash = "sha256:ceea2efd9e56add09330a2c9631ea3d55297b9619348c1055e5bfb9cb0b8c538", size = 87352, upload-time = "2026-03-17T06:16:40.597Z" }
+wheels = [
+ { url = "https://files.pythonhosted.org/packages/28/50/1a313fb700526b134c71eb8a225d8b83be0385dbb0204337b4379c698cef/jsonpath_python-1.1.5-py3-none-any.whl", hash = "sha256:a60315404d70a65e76c9a782c84e50600480221d94a58af47b7b4d437351cb4b", size = 14090, upload-time = "2026-03-17T06:16:39.152Z" },
+]
+
[[package]]
name = "jsonschema"
version = "4.26.0"
@@ -770,7 +779,7 @@ wheels = [
[[package]]
name = "mistral-vibe"
-version = "2.6.2"
+version = "2.7.0"
source = { editable = "." }
dependencies = [
{ name = "agent-client-protocol" },
@@ -843,7 +852,7 @@ requires-dist = [
{ name = "keyring", specifier = ">=25.6.0" },
{ name = "markdownify", specifier = ">=1.2.2" },
{ name = "mcp", specifier = ">=1.14.0" },
- { name = "mistralai", specifier = "==2.0.0" },
+ { name = "mistralai", specifier = "==2.1.3" },
{ name = "opentelemetry-api", specifier = ">=1.39.1" },
{ name = "opentelemetry-exporter-otlp-proto-http", specifier = ">=1.39.1" },
{ name = "opentelemetry-sdk", specifier = ">=1.39.1" },
@@ -891,20 +900,21 @@ dev = [
[[package]]
name = "mistralai"
-version = "2.0.0"
+version = "2.1.3"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "eval-type-backport" },
{ name = "httpx" },
+ { name = "jsonpath-python" },
{ name = "opentelemetry-api" },
{ name = "opentelemetry-semantic-conventions" },
{ name = "pydantic" },
{ name = "python-dateutil" },
{ name = "typing-inspection" },
]
-sdist = { url = "https://files.pythonhosted.org/packages/ec/5c/22fd7d1ec7e333f83dc5e2d0b176952a5d9a1f08519898c55616c92a81d8/mistralai-2.0.0.tar.gz", hash = "sha256:acb7937a53119ece67f4978809d4cf630fbf54b4dfe85c0eeae778ac40850fab", size = 317705, upload-time = "2026-03-10T17:12:48.616Z" }
+sdist = { url = "https://files.pythonhosted.org/packages/7d/98/5fe39d514c19477f06a07e088ce4a44c2e60ac9deebefb9e2c8ed8ef87d2/mistralai-2.1.3.tar.gz", hash = "sha256:0c5de4855b043cd0582406d5c1ddfd91e176f484a158e6ee0b4a0054231be266", size = 331929, upload-time = "2026-03-23T15:00:29.579Z" }
wheels = [
- { url = "https://files.pythonhosted.org/packages/b8/95/1587d555837bf635db28e2acee366cc47edc473cd3155515be14acced91b/mistralai-2.0.0-py3-none-any.whl", hash = "sha256:e551fc36d60d4c969140e37f10eab04986480e487f357c900da05d740b9a0baf", size = 709642, upload-time = "2026-03-10T17:12:50.104Z" },
+ { url = "https://files.pythonhosted.org/packages/15/7c/f91a26bf469c1cff57325379afa112baeb113ac577d28e69dd408cee5745/mistralai-2.1.3-py3-none-any.whl", hash = "sha256:26daac3bdc69fc2dd58f2c421710eb34131be7883b44a9ea81904a6306e6a90a", size = 754931, upload-time = "2026-03-23T15:00:30.934Z" },
]
[[package]]
diff --git a/vibe/__init__.py b/vibe/__init__.py
index f68104e..4eee166 100644
--- a/vibe/__init__.py
+++ b/vibe/__init__.py
@@ -3,4 +3,4 @@ from __future__ import annotations
from pathlib import Path
VIBE_ROOT = Path(__file__).parent
-__version__ = "2.6.2"
+__version__ = "2.7.0"
diff --git a/vibe/cli/commands.py b/vibe/cli/commands.py
index 6a343c9..0271132 100644
--- a/vibe/cli/commands.py
+++ b/vibe/cli/commands.py
@@ -1,6 +1,9 @@
from __future__ import annotations
from dataclasses import dataclass
+import sys
+
+ALT_KEY = "⌥" if sys.platform == "darwin" else "Alt"
@dataclass
@@ -97,6 +100,11 @@ class CommandRegistry:
description="Uninstall the Lean 4 agent",
handler="_uninstall_lean",
),
+ "rewind": Command(
+ aliases=frozenset(["/rewind"]),
+ description="Rewind to a previous message",
+ handler="_start_rewind_mode",
+ ),
}
for command in excluded_commands:
@@ -125,6 +133,7 @@ class CommandRegistry:
"- `Ctrl+G` Edit input in external editor",
"- `Ctrl+O` Toggle tool output view",
"- `Shift+Tab` Toggle auto-approve mode",
+ f"- `{ALT_KEY}+↑↓` / `Ctrl+P/N` Rewind to previous/next message",
"",
"### Special Features",
"",
diff --git a/vibe/cli/textual_ui/app.py b/vibe/cli/textual_ui/app.py
index 1ef3c69..ab3ac64 100644
--- a/vibe/cli/textual_ui/app.py
+++ b/vibe/cli/textual_ui/app.py
@@ -69,6 +69,7 @@ from vibe.cli.textual_ui.widgets.no_markup_static import NoMarkupStatic
from vibe.cli.textual_ui.widgets.path_display import PathDisplay
from vibe.cli.textual_ui.widgets.proxy_setup_app import ProxySetupApp
from vibe.cli.textual_ui.widgets.question_app import QuestionApp
+from vibe.cli.textual_ui.widgets.rewind_app import RewindApp
from vibe.cli.textual_ui.widgets.session_picker import SessionPickerApp
from vibe.cli.textual_ui.widgets.teleport_message import TeleportMessage
from vibe.cli.textual_ui.widgets.tools import ToolResultMessage
@@ -114,6 +115,7 @@ from vibe.core.autocompletion.path_prompt_adapter import render_path_prompt
from vibe.core.config import VibeConfig
from vibe.core.logger import logger
from vibe.core.paths import HISTORY_FILE
+from vibe.core.rewind import RewindError
from vibe.core.session.session_loader import SessionLoader
from vibe.core.teleport.types import (
TeleportAuthCompleteEvent,
@@ -166,6 +168,7 @@ class BottomApp(StrEnum):
ModelPicker = auto()
ProxySetup = auto()
Question = auto()
+ Rewind = auto()
SessionPicker = auto()
Voice = auto()
@@ -272,6 +275,10 @@ class VibeApp(App): # noqa: PLR0904
Binding(
"shift+down", "scroll_chat_down", "Scroll Down", show=False, priority=True
),
+ Binding("alt+up", "rewind_prev", "Rewind Previous", show=False, priority=True),
+ Binding("ctrl+p", "rewind_prev", "Rewind Previous", show=False, priority=True),
+ Binding("alt+down", "rewind_next", "Rewind Next", show=False, priority=True),
+ Binding("ctrl+n", "rewind_next", "Rewind Next", show=False, priority=True),
]
def __init__(
@@ -347,6 +354,9 @@ class VibeApp(App): # noqa: PLR0904
self._speak_task: asyncio.Task[None] | None = None
self._cancel_summary: Callable[[], bool] | None = None
+ self._rewind_mode = False
+ self._rewind_highlighted_widget: UserMessage | None = None
+
@property
def config(self) -> VibeConfig:
return self.agent_loop.config
@@ -750,7 +760,10 @@ class VibeApp(App): # noqa: PLR0904
)
async def _handle_user_message(self, message: str) -> None:
- user_message = UserMessage(message)
+ # message_index is where the user message will land in agent_loop.messages
+ # (checkpoint is created in agent_loop.act())
+ message_index = len(self.agent_loop.messages)
+ user_message = UserMessage(message, message_index=message_index)
await self._mount_and_scroll(user_message)
if self.agent_loop.telemetry_client.is_active():
@@ -1471,6 +1484,12 @@ class VibeApp(App): # noqa: PLR0904
await self._switch_from_input(QuestionApp(args=args), scroll=True)
async def _switch_to_input_app(self) -> None:
+ if self._chat_input_container:
+ self._chat_input_container.disabled = False
+ self._chat_input_container.display = True
+ self._current_bottom_app = BottomApp.Input
+ self._refresh_profile_widgets()
+
for app in BottomApp:
if app != BottomApp.Input:
try:
@@ -1479,10 +1498,6 @@ class VibeApp(App): # noqa: PLR0904
pass
if self._chat_input_container:
- self._chat_input_container.disabled = False
- self._chat_input_container.display = True
- self._current_bottom_app = BottomApp.Input
- self._refresh_profile_widgets()
self.call_after_refresh(self._chat_input_container.focus_input)
chat = self._cached_chat or self.query_one("#chat", ChatScroll)
if chat.is_at_bottom:
@@ -1505,6 +1520,8 @@ class VibeApp(App): # noqa: PLR0904
self.query_one(QuestionApp).focus()
case BottomApp.SessionPicker:
self.query_one(SessionPickerApp).focus()
+ case BottomApp.Rewind:
+ self.query_one(RewindApp).focus()
case BottomApp.Voice:
self.query_one(VoiceApp).focus()
case app:
@@ -1562,6 +1579,196 @@ class VibeApp(App): # noqa: PLR0904
pass
self._last_escape_time = None
+ # --- Rewind mode ---
+
+ def _get_user_message_widgets(self) -> list[UserMessage]:
+ """Return all UserMessage widgets currently visible in #messages."""
+ messages_area = self._cached_messages_area or self.query_one("#messages")
+ return [
+ child for child in messages_area.children if isinstance(child, UserMessage)
+ ]
+
+ def _start_rewind_mode(self) -> None:
+ self.action_rewind_prev()
+
+ def action_rewind_prev(self) -> None:
+ if self._agent_running:
+ return
+
+ user_widgets = self._get_user_message_widgets()
+ if not user_widgets:
+ return
+
+ if not self._rewind_mode:
+ self._rewind_mode = True
+ target = user_widgets[-1]
+ elif self._rewind_highlighted_widget is not None:
+ try:
+ idx = user_widgets.index(self._rewind_highlighted_widget)
+ except ValueError:
+ idx = len(user_widgets)
+ if idx <= 0:
+ self.run_worker(self._rewind_prev_at_top(), exclusive=False)
+ return
+ target = user_widgets[idx - 1]
+ else:
+ target = user_widgets[-1]
+
+ self.run_worker(self._select_rewind_widget(target), exclusive=False)
+
+ async def _rewind_prev_at_top(self) -> None:
+ """Handle alt+up when already at the topmost visible user message."""
+ if self._load_more.widget is not None and self._windowing.has_backfill:
+ await self.on_history_load_more_requested(HistoryLoadMoreRequested())
+ user_widgets = self._get_user_message_widgets()
+ if user_widgets and self._rewind_highlighted_widget is not None:
+ # Find the current highlighted widget in the refreshed list
+ # and select the one above it
+ try:
+ idx = user_widgets.index(self._rewind_highlighted_widget)
+ except ValueError:
+ idx = 0
+ if idx > 0:
+ await self._select_rewind_widget(user_widgets[idx - 1])
+ return
+ # No load more or already first message: scroll to top
+ chat = self._cached_chat or self.query_one("#chat", ChatScroll)
+ self.call_after_refresh(chat.scroll_home, animate=False)
+
+ def action_rewind_next(self) -> None:
+ if not self._rewind_mode:
+ return
+
+ if self._rewind_highlighted_widget is None:
+ return
+
+ user_widgets = self._get_user_message_widgets()
+ try:
+ idx = user_widgets.index(self._rewind_highlighted_widget)
+ except ValueError:
+ return
+ if idx >= len(user_widgets) - 1:
+ return
+
+ self.run_worker(
+ self._select_rewind_widget(user_widgets[idx + 1]), exclusive=False
+ )
+
+ async def _select_rewind_widget(self, widget: UserMessage) -> None:
+ """Highlight the given user message widget and show the rewind panel."""
+ if self._rewind_highlighted_widget is not None:
+ self._rewind_highlighted_widget.remove_class("rewind-selected")
+
+ widget.add_class("rewind-selected")
+ self._rewind_highlighted_widget = widget
+
+ msg_index = widget.message_index
+ has_file_changes = (
+ msg_index is not None
+ and self.agent_loop.rewind_manager.has_file_changes_at(msg_index)
+ )
+
+ await self._switch_to_rewind_app(
+ widget.get_content(), has_file_changes=has_file_changes
+ )
+
+ chat = self._cached_chat or self.query_one("#chat", ChatScroll)
+ self.call_after_refresh(chat.scroll_to_widget, widget, animate=False, top=True)
+
+ async def _switch_to_rewind_app(
+ self, message_preview: str, *, has_file_changes: bool
+ ) -> None:
+ """Show the rewind action panel at the bottom."""
+ if self._current_bottom_app == BottomApp.Rewind:
+ # Reuse existing widget if the option set hasn't changed
+ try:
+ existing = self.query_one(RewindApp)
+ if existing.has_file_changes == has_file_changes:
+ existing.update_preview(message_preview)
+ return
+ await existing.remove()
+ except Exception:
+ pass
+
+ rewind_app = RewindApp(
+ message_preview=message_preview, has_file_changes=has_file_changes
+ )
+ bottom_container = self.query_one("#bottom-app-container")
+ self._current_bottom_app = BottomApp.Rewind
+ await bottom_container.mount(rewind_app)
+ self.call_after_refresh(rewind_app.focus)
+ else:
+ rewind_app = RewindApp(
+ message_preview=message_preview, has_file_changes=has_file_changes
+ )
+ await self._switch_from_input(rewind_app)
+
+ def _clear_rewind_state(self) -> None:
+ if self._rewind_highlighted_widget is not None:
+ self._rewind_highlighted_widget.remove_class("rewind-selected")
+ self._rewind_highlighted_widget = None
+ self._rewind_mode = False
+
+ async def _exit_rewind_mode(self) -> None:
+ """Exit rewind mode and restore the input panel."""
+ self._clear_rewind_state()
+ await self._switch_to_input_app()
+
+ async def on_rewind_app_rewind_with_restore(
+ self, message: RewindApp.RewindWithRestore
+ ) -> None:
+ await self._execute_rewind(restore_files=True)
+
+ async def on_rewind_app_rewind_without_restore(
+ self, message: RewindApp.RewindWithoutRestore
+ ) -> None:
+ await self._execute_rewind(restore_files=False)
+
+ async def _execute_rewind(self, *, restore_files: bool) -> None:
+ """Fork the session at the selected user message."""
+ if not self._rewind_mode or self._rewind_highlighted_widget is None:
+ return
+
+ target_widget = self._rewind_highlighted_widget
+ msg_index = target_widget.message_index
+
+ if msg_index is None:
+ return
+
+ try:
+ (
+ message_content,
+ restore_errors,
+ ) = await self.agent_loop.rewind_manager.rewind_to_message(
+ msg_index, restore_files=restore_files
+ )
+ except RewindError as exc:
+ self.notify(str(exc), severity="error")
+ return
+
+ for error in restore_errors:
+ self.notify(error, severity="warning")
+
+ # Remove UI widgets from the selected message onward
+ messages_area = self._cached_messages_area or self.query_one("#messages")
+ children = list(messages_area.children)
+ try:
+ target_idx = children.index(target_widget)
+ except ValueError:
+ target_idx = len(children)
+ to_remove = children[target_idx:]
+ if to_remove:
+ await messages_area.remove_children(to_remove)
+
+ self._clear_rewind_state()
+
+ # Switch back to input and pre-fill with the original message
+ await self._switch_to_input_app()
+ if self._chat_input_container:
+ self._chat_input_container.value = message_content
+
+ # --- End rewind mode ---
+
def _handle_input_app_escape(self) -> None:
try:
input_widget = self.query_one(ChatInputContainer)
@@ -1614,6 +1821,11 @@ class VibeApp(App): # noqa: PLR0904
self._handle_session_picker_app_escape()
return
+ if self._current_bottom_app == BottomApp.Rewind:
+ self.run_worker(self._exit_rewind_mode(), exclusive=False)
+ self._last_escape_time = None
+ return
+
if (
self._current_bottom_app == BottomApp.Input
and self._last_escape_time is not None
diff --git a/vibe/cli/textual_ui/app.tcss b/vibe/cli/textual_ui/app.tcss
index c03d50a..37fe1d7 100644
--- a/vibe/cli/textual_ui/app.tcss
+++ b/vibe/cli/textual_ui/app.tcss
@@ -529,6 +529,24 @@ StatusMessage {
height: auto;
}
+.tool-result-widget Markdown,
+.tool-approval-widget Markdown {
+ padding: 0;
+ margin: 0;
+}
+
+.tool-result-widget Markdown > *,
+.tool-approval-widget Markdown > * {
+ margin-top: 0;
+ margin-bottom: 0;
+}
+
+.tool-result-widget Markdown MarkdownFence,
+.tool-approval-widget Markdown MarkdownFence {
+ margin: 0;
+ padding: 0;
+}
+
.tool-call-detail,
.tool-result-detail {
height: auto;
@@ -1111,6 +1129,51 @@ FeedbackBar {
margin-left: 1;
}
+.user-message.rewind-selected {
+ .user-message-content {
+ text-style: bold reverse;
+ }
+}
+
+#rewind-app {
+ width: 100%;
+ height: auto;
+ background: transparent;
+ border: solid ansi_bright_black;
+ padding: 0 1;
+ margin: 0;
+}
+
+#rewind-content {
+ width: 100%;
+ height: auto;
+}
+
+.rewind-title {
+ height: auto;
+ text-style: bold;
+ color: ansi_blue;
+}
+
+.rewind-option {
+ height: auto;
+ color: ansi_default;
+}
+
+.rewind-cursor-selected {
+ color: ansi_blue;
+ text-style: bold;
+}
+
+.rewind-option-unselected {
+ color: ansi_default;
+}
+
+.rewind-help {
+ height: auto;
+ color: ansi_bright_black;
+}
+
#feedback-text {
width: auto;
height: auto;
diff --git a/vibe/cli/textual_ui/widgets/messages.py b/vibe/cli/textual_ui/widgets/messages.py
index 81aacb9..1cf7bdf 100644
--- a/vibe/cli/textual_ui/widgets/messages.py
+++ b/vibe/cli/textual_ui/widgets/messages.py
@@ -38,11 +38,17 @@ class ExpandingBorder(NonSelectableStatic):
class UserMessage(Static):
- def __init__(self, content: str, pending: bool = False) -> None:
+ def __init__(
+ self, content: str, pending: bool = False, message_index: int | None = None
+ ) -> None:
super().__init__()
self.add_class("user-message")
self._content = content
self._pending = pending
+ self.message_index: int | None = message_index
+
+ def get_content(self) -> str:
+ return self._content
def compose(self) -> ComposeResult:
with Horizontal(classes="user-message-container"):
diff --git a/vibe/cli/textual_ui/widgets/rewind_app.py b/vibe/cli/textual_ui/widgets/rewind_app.py
new file mode 100644
index 0000000..e1dd090
--- /dev/null
+++ b/vibe/cli/textual_ui/widgets/rewind_app.py
@@ -0,0 +1,147 @@
+from __future__ import annotations
+
+from enum import StrEnum, auto
+from typing import ClassVar
+
+from textual import events
+from textual.app import ComposeResult
+from textual.binding import Binding, BindingType
+from textual.containers import Container, Vertical
+from textual.message import Message
+from textual.widgets import Static
+
+from vibe.cli.commands import ALT_KEY
+from vibe.cli.textual_ui.widgets.no_markup_static import NoMarkupStatic
+
+
+class _RewindAction(StrEnum):
+ EDIT_AND_RESTORE = auto()
+ EDIT_ONLY = auto()
+
+
+class RewindApp(Container):
+ """Bottom panel widget for rewind mode actions."""
+
+ can_focus = True
+ can_focus_children = False
+
+ BINDINGS: ClassVar[list[BindingType]] = [
+ Binding("up", "move_up", "Up", show=False),
+ Binding("down", "move_down", "Down", show=False),
+ Binding("enter", "select", "Select", show=False),
+ Binding("1", "select_1", "Option 1", show=False),
+ Binding("2", "select_2", "Option 2", show=False),
+ ]
+
+ class RewindWithRestore(Message):
+ """User chose to edit the message and restore files."""
+
+ class RewindWithoutRestore(Message):
+ """User chose to edit the message without restoring files."""
+
+ def __init__(self, message_preview: str, *, has_file_changes: bool) -> None:
+ super().__init__(id="rewind-app")
+ self._message_preview = message_preview
+ self._has_file_changes = has_file_changes
+ self.selected_option = 0
+ self.option_widgets: list[Static] = []
+ self._title_widget: NoMarkupStatic | None = None
+ self._options = self._build_options()
+
+ def _build_options(self) -> list[tuple[str, _RewindAction]]:
+ options: list[tuple[str, _RewindAction]] = []
+ if self._has_file_changes:
+ options.append((
+ "Edit & restore files to this point",
+ _RewindAction.EDIT_AND_RESTORE,
+ ))
+ edit_only_label = (
+ "Edit without restoring files"
+ if self._has_file_changes
+ else "Edit message from here"
+ )
+ options.append((edit_only_label, _RewindAction.EDIT_ONLY))
+ return options
+
+ @property
+ def has_file_changes(self) -> bool:
+ return self._has_file_changes
+
+ def update_preview(self, message_preview: str) -> None:
+ self._message_preview = message_preview
+ if self._title_widget is not None:
+ self._title_widget.update(f"Rewind to: {message_preview[:80]}")
+
+ def compose(self) -> ComposeResult:
+ with Vertical(id="rewind-content"):
+ self._title_widget = NoMarkupStatic(
+ f"Rewind to: {self._message_preview[:80]}", classes="rewind-title"
+ )
+ yield self._title_widget
+ yield NoMarkupStatic("")
+ for _ in range(len(self._options)):
+ widget = NoMarkupStatic("", classes="rewind-option")
+ self.option_widgets.append(widget)
+ yield widget
+ yield NoMarkupStatic("")
+ yield NoMarkupStatic(
+ f"{ALT_KEY}+↑↓ or Ctrl+P/N browse messages ↑↓ pick option Enter confirm ESC cancel",
+ classes="rewind-help",
+ )
+
+ async def on_mount(self) -> None:
+ self._update_options()
+ self.focus()
+
+ def _update_options(self) -> None:
+ for idx, ((text, _action), widget) in enumerate(
+ zip(self._options, self.option_widgets, strict=True)
+ ):
+ is_selected = idx == self.selected_option
+ cursor = "› " if is_selected else " "
+ option_text = f"{cursor}{idx + 1}. {text}"
+
+ widget.update(option_text)
+
+ widget.remove_class("rewind-cursor-selected")
+ widget.remove_class("rewind-option-unselected")
+
+ if is_selected:
+ widget.add_class("rewind-cursor-selected")
+ else:
+ widget.add_class("rewind-option-unselected")
+
+ def _option_count(self) -> int:
+ return len(self._options)
+
+ def action_move_up(self) -> None:
+ self.selected_option = (self.selected_option - 1) % self._option_count()
+ self._update_options()
+
+ def action_move_down(self) -> None:
+ self.selected_option = (self.selected_option + 1) % self._option_count()
+ self._update_options()
+
+ def action_select(self) -> None:
+ self._handle_selection(self.selected_option)
+
+ def action_select_1(self) -> None:
+ if self._option_count() >= 1:
+ self.selected_option = 0
+ self._handle_selection(0)
+
+ def action_select_2(self) -> None:
+ if self._option_count() >= 2: # noqa: PLR2004
+ self.selected_option = 1
+ self._handle_selection(1)
+
+ def _handle_selection(self, option: int) -> None:
+ _, action = self._options[option]
+ match action:
+ case _RewindAction.EDIT_AND_RESTORE:
+ self.post_message(self.RewindWithRestore())
+ case _RewindAction.EDIT_ONLY:
+ self.post_message(self.RewindWithoutRestore())
+
+ def on_blur(self, event: events.Blur) -> None:
+ self.call_after_refresh(self.focus)
diff --git a/vibe/cli/textual_ui/widgets/session_picker.py b/vibe/cli/textual_ui/widgets/session_picker.py
index 11404b5..e557272 100644
--- a/vibe/cli/textual_ui/widgets/session_picker.py
+++ b/vibe/cli/textual_ui/widgets/session_picker.py
@@ -98,8 +98,7 @@ class SessionPickerApp(Container):
with Vertical(id="sessionpicker-content"):
yield OptionList(*options, id="sessionpicker-options")
yield NoMarkupStatic(
- "Up/Down Navigate Enter Select Esc Cancel",
- classes="sessionpicker-help",
+ "↑↓ Navigate Enter Select Esc Cancel", classes="sessionpicker-help"
)
def on_mount(self) -> None:
diff --git a/vibe/cli/textual_ui/windowing/history.py b/vibe/cli/textual_ui/windowing/history.py
index 2a70404..cdb743c 100644
--- a/vibe/cli/textual_ui/windowing/history.py
+++ b/vibe/cli/textual_ui/windowing/history.py
@@ -39,12 +39,17 @@ def build_history_widgets(
) -> list[Widget]:
widgets: list[Widget] = []
- for offset, msg in enumerate(batch):
- history_index = start_index + offset
+ for history_index, msg in zip(
+ range(start_index, start_index + len(batch)), batch, strict=True
+ ):
+ if msg.injected:
+ continue
match msg.role:
case Role.user:
if msg.content:
- widget = UserMessage(msg.content)
+ # history_index is 0-based in non-system messages;
+ # agent_loop.messages index = history_index + 1 (system msg at 0)
+ widget = UserMessage(msg.content, message_index=history_index + 1)
widgets.append(widget)
history_widget_indices[widget] = history_index
diff --git a/vibe/cli/textual_ui/windowing/state.py b/vibe/cli/textual_ui/windowing/state.py
index d9cdf3f..d6913cc 100644
--- a/vibe/cli/textual_ui/windowing/state.py
+++ b/vibe/cli/textual_ui/windowing/state.py
@@ -60,9 +60,23 @@ class SessionWindowing:
self._backfill_cursor = 0
return False
if visible_indices:
- backfill_end = min(visible_indices)
+ oldest_widget = min(visible_indices)
+ # _backfill_cursor is the first history index in the loaded window (tail + prepends).
+ # Oldest widgets can start *after* that when the tail begins with injected-only slots
+ # (no widgets). Using min(visible_indices) alone would shrink the backfill prefix into
+ # the already-mounted tail and break load-more batch start_index alignment.
+ if oldest_widget > self._backfill_cursor:
+ prefix = history_messages[self._backfill_cursor : oldest_widget]
+ backfill_end = (
+ self._backfill_cursor
+ if prefix and all(m.injected for m in prefix)
+ else oldest_widget
+ )
+ else:
+ backfill_end = self._backfill_cursor
else:
backfill_end = max(len(history_messages) - visible_history_widgets_count, 0)
+ backfill_end = min(backfill_end, len(history_messages))
self._backfill_messages = history_messages[:backfill_end]
self._backfill_cursor = len(self._backfill_messages)
return self._backfill_cursor > 0
diff --git a/vibe/core/agent_loop.py b/vibe/core/agent_loop.py
index 5147fec..4d73e2f 100644
--- a/vibe/core/agent_loop.py
+++ b/vibe/core/agent_loop.py
@@ -45,6 +45,7 @@ from vibe.core.middleware import (
)
from vibe.core.plan_session import PlanSession
from vibe.core.prompts import UtilityPrompt
+from vibe.core.rewind import RewindManager
from vibe.core.session.session_logger import SessionLogger
from vibe.core.session.session_migration import migrate_sessions_entrypoint
from vibe.core.skills.manager import SkillManager
@@ -213,6 +214,11 @@ class AgentLoop:
config_getter=lambda: self.config, session_id_getter=lambda: self.session_id
)
self.session_logger = SessionLogger(config.session_logging, self.session_id)
+ self.rewind_manager = RewindManager(
+ messages=self.messages,
+ save_messages=self._save_messages,
+ reset_session=self._reset_session,
+ )
self._teleport_service: TeleportService | None = None
thread = Thread(
@@ -340,6 +346,7 @@ class AgentLoop:
async def act(self, msg: str) -> AsyncGenerator[BaseEvent]:
self._clean_message_history()
+ self.rewind_manager.create_checkpoint()
try:
model_name = self.config.get_active_model().name
except ValueError:
@@ -447,7 +454,7 @@ class AgentLoop:
case MiddlewareAction.INJECT_MESSAGE:
if result.message:
injected_message = LLMMessage(
- role=Role.user, content=result.message
+ role=Role.user, content=result.message, injected=True
)
self.messages.append(injected_message)
@@ -693,6 +700,10 @@ class AgentLoop:
self.stats.tool_calls_agreed += 1
+ snapshot = tool_instance.get_file_snapshot(tool_call.validated_args)
+ if snapshot is not None:
+ self.rewind_manager.add_snapshot(snapshot)
+
start_time = time.perf_counter()
result_model = None
async for item in tool_instance.invoke(
@@ -928,7 +939,7 @@ class AgentLoop:
try:
start_time = time.perf_counter()
usage = LLMUsage()
- chunk_agg = LLMChunk(message=LLMMessage(role=Role.assistant))
+ chunk_agg: LLMChunk | None = None
async for chunk in self.backend.complete_streaming(
model=active_model,
messages=self.messages,
@@ -945,12 +956,16 @@ class AgentLoop:
chunk.message
)
processed_chunk = LLMChunk(message=processed_message, usage=chunk.usage)
- chunk_agg += processed_chunk
+ chunk_agg = (
+ processed_chunk
+ if chunk_agg is None
+ else chunk_agg + processed_chunk
+ )
usage += chunk.usage or LLMUsage()
yield processed_chunk
end_time = time.perf_counter()
- if chunk_agg.usage is None:
+ if chunk_agg is None or chunk_agg.usage is None:
raise AgentLoopLLMResponseError(
"Usage data missing in final chunk of streamed completion"
)
@@ -1259,10 +1274,7 @@ class AgentLoop:
self.tool_manager, self.config, self.skill_manager, self.agent_manager
)
- self.messages.reset([
- LLMMessage(role=Role.system, content=new_system_prompt),
- *[msg for msg in self.messages if msg.role != Role.system],
- ])
+ self.messages.update_system_prompt(new_system_prompt)
if len(self.messages) == 1:
self.stats.reset_context_state()
diff --git a/vibe/core/agents/models.py b/vibe/core/agents/models.py
index 6a92005..e46c712 100644
--- a/vibe/core/agents/models.py
+++ b/vibe/core/agents/models.py
@@ -163,8 +163,7 @@ LEAN = AgentProfile(
"name": "mistral-testing",
"api_base": "https://api.mistral.ai/v1",
"api_key_env_var": "MISTRAL_API_KEY",
- "api_style": "reasoning",
- "backend": "generic",
+ "backend": "mistral",
}
],
"models": [
diff --git a/vibe/core/llm/backend/generic.py b/vibe/core/llm/backend/generic.py
index 57f0fb9..34ec9fa 100644
--- a/vibe/core/llm/backend/generic.py
+++ b/vibe/core/llm/backend/generic.py
@@ -258,8 +258,7 @@ class GenericBackend:
raise BackendErrorBuilder.build_http_error(
provider=self._provider.name,
endpoint=url,
- response=e.response,
- headers=e.response.headers,
+ error=e,
model=model.name,
messages=messages,
temperature=temperature,
@@ -327,8 +326,7 @@ class GenericBackend:
raise BackendErrorBuilder.build_http_error(
provider=self._provider.name,
endpoint=url,
- response=e.response,
- headers=e.response.headers,
+ error=e,
model=model.name,
messages=messages,
temperature=temperature,
diff --git a/vibe/core/llm/backend/mistral.py b/vibe/core/llm/backend/mistral.py
index 6cfa4c3..25db5e4 100644
--- a/vibe/core/llm/backend/mistral.py
+++ b/vibe/core/llm/backend/mistral.py
@@ -4,7 +4,7 @@ from collections.abc import AsyncGenerator, Sequence
import json
import os
import types
-from typing import TYPE_CHECKING, NamedTuple, cast
+from typing import TYPE_CHECKING, Literal, NamedTuple, cast
import httpx
from mistralai.client import Mistral
@@ -32,7 +32,10 @@ from mistralai.client.models import (
from mistralai.client.utils.retries import BackoffStrategy, RetryConfig
from vibe.core.llm.exceptions import BackendErrorBuilder
-from vibe.core.llm.message_utils import merge_consecutive_user_messages
+from vibe.core.llm.message_utils import (
+ merge_consecutive_user_messages,
+ strip_reasoning as strip_reasoning_message,
+)
from vibe.core.types import (
AvailableTool,
Content,
@@ -168,6 +171,18 @@ class MistralMapper:
for tool_call in tool_calls
]
+ def strip_reasoning(self, msg: LLMMessage) -> LLMMessage:
+ return strip_reasoning_message(msg)
+
+
+ReasoningEffortValue = Literal["none", "high"]
+
+_THINKING_TO_REASONING_EFFORT: dict[str, ReasoningEffortValue] = {
+ "low": "none",
+ "medium": "high",
+ "high": "high",
+}
+
class MistralBackend:
def __init__(self, provider: ProviderConfig, timeout: float = 720.0) -> None:
@@ -253,6 +268,14 @@ class MistralBackend:
) -> LLMChunk:
try:
merged_messages = merge_consecutive_user_messages(messages)
+ reasoning_effort = _THINKING_TO_REASONING_EFFORT.get(model.thinking)
+ if reasoning_effort is not None:
+ temperature = 1.0
+ else:
+ merged_messages = [
+ strip_reasoning_message(msg) for msg in merged_messages
+ ]
+
response = await self._get_client().chat.complete_async(
model=model.name,
messages=[self._mapper.prepare_message(msg) for msg in merged_messages],
@@ -267,6 +290,7 @@ class MistralBackend:
http_headers=extra_headers,
metadata=metadata,
stream=False,
+ reasoning_effort=reasoning_effort,
)
parsed = (
@@ -295,8 +319,7 @@ class MistralBackend:
raise BackendErrorBuilder.build_http_error(
provider=self._provider.name,
endpoint=self._server_url,
- response=e.raw_response,
- headers=e.raw_response.headers,
+ error=e,
model=model.name,
messages=messages,
temperature=temperature,
@@ -329,6 +352,14 @@ class MistralBackend:
) -> AsyncGenerator[LLMChunk, None]:
try:
merged_messages = merge_consecutive_user_messages(messages)
+ reasoning_effort = _THINKING_TO_REASONING_EFFORT.get(model.thinking)
+ if reasoning_effort is not None:
+ temperature = 1.0
+ else:
+ merged_messages = [
+ strip_reasoning_message(msg) for msg in merged_messages
+ ]
+
stream = await self._get_client().chat.stream_async(
model=model.name,
messages=[self._mapper.prepare_message(msg) for msg in merged_messages],
@@ -342,6 +373,7 @@ class MistralBackend:
else None,
http_headers=extra_headers,
metadata=metadata,
+ reasoning_effort=reasoning_effort,
)
correlation_id = stream.response.headers.get("mistral-correlation-id")
async for chunk in stream:
@@ -376,8 +408,7 @@ class MistralBackend:
raise BackendErrorBuilder.build_http_error(
provider=self._provider.name,
endpoint=self._server_url,
- response=e.raw_response,
- headers=e.raw_response.headers,
+ error=e,
model=model.name,
messages=messages,
temperature=temperature,
diff --git a/vibe/core/llm/backend/reasoning_adapter.py b/vibe/core/llm/backend/reasoning_adapter.py
index b794f2a..5b44bcb 100644
--- a/vibe/core/llm/backend/reasoning_adapter.py
+++ b/vibe/core/llm/backend/reasoning_adapter.py
@@ -6,7 +6,7 @@ from typing import Any, ClassVar
from vibe.core.config import ProviderConfig
from vibe.core.llm.backend.base import APIAdapter, PreparedRequest
-from vibe.core.llm.message_utils import merge_consecutive_user_messages
+from vibe.core.llm.message_utils import merge_consecutive_user_messages, strip_reasoning
from vibe.core.types import (
AvailableTool,
FunctionCall,
@@ -107,13 +107,6 @@ class ReasoningAdapter(APIAdapter):
return payload
- def _strip_reasoning(self, msg: LLMMessage) -> LLMMessage:
- if msg.role != Role.assistant or not msg.reasoning_content:
- return msg
- return msg.model_copy(
- update={"reasoning_content": None, "reasoning_signature": None}
- )
-
def prepare_request( # noqa: PLR0913
self,
*,
@@ -130,7 +123,7 @@ class ReasoningAdapter(APIAdapter):
) -> PreparedRequest:
merged_messages = merge_consecutive_user_messages(messages)
if thinking == "off":
- merged_messages = [self._strip_reasoning(msg) for msg in merged_messages]
+ merged_messages = [strip_reasoning(msg) for msg in merged_messages]
converted_messages = [self._convert_message(msg) for msg in merged_messages]
payload = self._build_payload(
diff --git a/vibe/core/llm/exceptions.py b/vibe/core/llm/exceptions.py
index ae2da6c..b72010d 100644
--- a/vibe/core/llm/exceptions.py
+++ b/vibe/core/llm/exceptions.py
@@ -6,10 +6,13 @@ import json
from typing import Any
import httpx
+from mistralai.client.errors import SDKError
from pydantic import BaseModel, ConfigDict, ValidationError
from vibe.core.types import AvailableTool, LLMMessage, StrToolChoice
+type HttpError = SDKError | httpx.HTTPStatusError
+
class ErrorDetail(BaseModel):
model_config = ConfigDict(extra="ignore")
@@ -111,25 +114,22 @@ class BackendErrorBuilder:
*,
provider: str,
endpoint: str,
- response: httpx.Response,
- headers: Mapping[str, str] | None,
+ error: HttpError,
model: str,
messages: Sequence[LLMMessage],
temperature: float,
has_tools: bool,
tool_choice: StrToolChoice | AvailableTool | None,
) -> BackendError:
- try:
- body_text = response.text
- except Exception: # On streaming responses, we can't read the body
- body_text = None
+ response = error.raw_response if isinstance(error, SDKError) else error.response
+ body_text = cls._read_response_body(response, error)
return BackendError(
provider=provider,
endpoint=endpoint,
status=response.status_code,
reason=response.reason_phrase,
- headers=headers or {},
+ headers=response.headers,
body_text=body_text,
parsed_error=cls._parse_provider_error(body_text),
model=model,
@@ -165,6 +165,17 @@ class BackendErrorBuilder:
),
)
+ @staticmethod
+ def _read_response_body(response: httpx.Response, error: HttpError) -> str | None:
+ try:
+ response.read()
+ return response.text
+ except Exception:
+ pass
+ if body := getattr(error, "body", None):
+ return body
+ return str(error)
+
@staticmethod
def _parse_provider_error(body_text: str | None) -> str | None:
if not body_text:
diff --git a/vibe/core/llm/message_utils.py b/vibe/core/llm/message_utils.py
index 99d46f4..b58b013 100644
--- a/vibe/core/llm/message_utils.py
+++ b/vibe/core/llm/message_utils.py
@@ -5,6 +5,14 @@ from collections.abc import Sequence
from vibe.core.types import LLMMessage, Role
+def strip_reasoning(msg: LLMMessage) -> LLMMessage:
+ if msg.role != Role.assistant or not msg.reasoning_content:
+ return msg
+ return msg.model_copy(
+ update={"reasoning_content": None, "reasoning_signature": None}
+ )
+
+
def merge_consecutive_user_messages(messages: Sequence[LLMMessage]) -> list[LLMMessage]:
"""Merge consecutive user messages into a single message.
diff --git a/vibe/core/rewind/__init__.py b/vibe/core/rewind/__init__.py
new file mode 100644
index 0000000..bb6d913
--- /dev/null
+++ b/vibe/core/rewind/__init__.py
@@ -0,0 +1,10 @@
+from __future__ import annotations
+
+from vibe.core.rewind.manager import (
+ Checkpoint,
+ FileSnapshot,
+ RewindError,
+ RewindManager,
+)
+
+__all__ = ["Checkpoint", "FileSnapshot", "RewindError", "RewindManager"]
diff --git a/vibe/core/rewind/manager.py b/vibe/core/rewind/manager.py
new file mode 100644
index 0000000..2a77e19
--- /dev/null
+++ b/vibe/core/rewind/manager.py
@@ -0,0 +1,192 @@
+from __future__ import annotations
+
+from collections.abc import Awaitable, Callable, Sequence
+from dataclasses import dataclass, field
+import os
+from pathlib import Path
+
+from vibe.core.logger import logger
+from vibe.core.types import LLMMessage, MessageList, Role
+
+
+class RewindError(Exception):
+ """Raised when a rewind operation fails."""
+
+
+@dataclass(frozen=True, slots=True)
+class FileSnapshot:
+ """Snapshot of a single file's content at a point in time.
+
+ content is None if the file did not exist (was created after the snapshot).
+ """
+
+ path: str
+ content: bytes | None
+
+
+@dataclass
+class Checkpoint:
+ """Snapshot of tracked files taken before a user message."""
+
+ message_index: int
+ files: list[FileSnapshot] = field(default_factory=list)
+
+
+class RewindManager:
+ """Manages conversation rewind: file snapshots, message truncation, and session forking."""
+
+ def __init__(
+ self,
+ messages: MessageList,
+ save_messages: Callable[[], Awaitable[None]],
+ reset_session: Callable[[], None],
+ ) -> None:
+ self._checkpoints: list[Checkpoint] = []
+ self._messages = messages
+ self._save_messages = save_messages
+ self._reset_session = reset_session
+ self._is_rewinding = False
+ self._messages.on_reset(self._on_messages_reset)
+
+ # -- Checkpoint management -------------------------------------------------
+
+ @property
+ def checkpoints(self) -> list[Checkpoint]:
+ return list(self._checkpoints)
+
+ def create_checkpoint(self) -> None:
+ """Snapshot known files and start a new checkpoint at the current message position.
+
+ Files known from the previous checkpoint are re-read from disk so
+ that each checkpoint captures the actual state at that point in time.
+ """
+ files: list[FileSnapshot] = []
+ if self._checkpoints:
+ for snap in self._checkpoints[-1].files:
+ files.append(self._read_snapshot(snap.path))
+ self._checkpoints.append(
+ Checkpoint(message_index=len(self._messages), files=files)
+ )
+
+ def add_snapshot(self, snapshot: FileSnapshot) -> None:
+ """Record a file snapshot into every checkpoint that doesn't have it yet."""
+ for cp in self._checkpoints:
+ if all(s.path != snapshot.path for s in cp.files):
+ cp.files.append(snapshot)
+
+ def has_file_changes_at(self, message_index: int) -> bool:
+ """Check if files have changed since the checkpoint at *message_index*."""
+ checkpoint = self._get_checkpoint(message_index)
+ if checkpoint is None:
+ return False
+ return self._has_changes_since(checkpoint)
+
+ # -- Rewind operations -----------------------------------------------------
+
+ def get_rewindable_messages(self) -> list[tuple[int, str]]:
+ """Return (message_index, content) for each user message."""
+ return [
+ (i, msg.content or "")
+ for i, msg in enumerate(self._messages)
+ if msg.role == Role.user and msg.content and not msg.injected
+ ]
+
+ async def rewind_to_message(
+ self, message_index: int, *, restore_files: bool
+ ) -> tuple[str, list[str]]:
+ """Rewind the session to the given user message index.
+
+ Saves the current session, truncates messages, optionally restores
+ files, and forks to a new session.
+
+ Returns a tuple of (message_content, restore_errors).
+
+ Raises:
+ RewindError: If the message index is invalid or not a user message.
+ """
+ messages: Sequence[LLMMessage] = self._messages
+ if message_index < 0 or message_index >= len(messages):
+ raise RewindError(f"Invalid message index: {message_index}")
+
+ user_msg = messages[message_index]
+ if user_msg.role != Role.user:
+ raise RewindError(f"Message at index {message_index} is not a user message")
+
+ message_content = user_msg.content or ""
+ restore_errors: list[str] = []
+
+ if restore_files:
+ checkpoint = self._get_checkpoint(message_index)
+ if checkpoint:
+ restore_errors = self._restore_checkpoint(checkpoint)
+
+ await self._save_messages()
+ self._checkpoints = [
+ cp for cp in self._checkpoints if cp.message_index < message_index
+ ]
+ self._is_rewinding = True
+ try:
+ self._messages.reset(list(messages[:message_index]))
+ finally:
+ self._is_rewinding = False
+ self._reset_session()
+
+ return message_content, restore_errors
+
+ # -- Private helpers -------------------------------------------------------
+
+ def _get_checkpoint(self, message_index: int) -> Checkpoint | None:
+ for cp in self._checkpoints:
+ if cp.message_index == message_index:
+ return cp
+ return None
+
+ def _restore_checkpoint(self, checkpoint: Checkpoint) -> list[str]:
+ """Restore files on disk to match the checkpoint state.
+
+ Returns a list of human-readable error messages for files that
+ could not be restored (empty when everything succeeded).
+ """
+ errors: list[str] = []
+ for snap in checkpoint.files:
+ path = Path(snap.path)
+ if snap.content is None:
+ if path.exists():
+ try:
+ os.remove(path)
+ except Exception:
+ errors.append(f"Failed to delete file: {snap.path}")
+ else:
+ try:
+ path.parent.mkdir(parents=True, exist_ok=True)
+ path.write_bytes(snap.content)
+ except Exception:
+ errors.append(f"Failed to restore file: {snap.path}")
+ return errors
+
+ @staticmethod
+ def _has_changes_since(checkpoint: Checkpoint) -> bool:
+ for snap in checkpoint.files:
+ try:
+ current: bytes | None = Path(snap.path).read_bytes()
+ except FileNotFoundError:
+ current = None
+ if current != snap.content:
+ return True
+ return False
+
+ @staticmethod
+ def _read_snapshot(path: str) -> FileSnapshot:
+ try:
+ content: bytes | None = Path(path).read_bytes()
+ except FileNotFoundError:
+ content = None
+ except Exception:
+ logger.warning("Failed to read file for checkpoint: %s", path)
+ content = None
+ return FileSnapshot(path=path, content=content)
+
+ def _on_messages_reset(self) -> None:
+ """Called when the message list is reset (session switch, clear, compact, etc.)."""
+ if not self._is_rewinding:
+ self._checkpoints.clear()
diff --git a/vibe/core/tools/base.py b/vibe/core/tools/base.py
index 43d7d52..cb9cc26 100644
--- a/vibe/core/tools/base.py
+++ b/vibe/core/tools/base.py
@@ -23,6 +23,8 @@ from typing import (
from pydantic import BaseModel, ConfigDict, Field, ValidationError
+from vibe.core.logger import logger
+from vibe.core.rewind.manager import FileSnapshot
from vibe.core.types import ToolStreamEvent
from vibe.core.utils.io import read_safe
@@ -355,6 +357,30 @@ class BaseTool[
"""
return None
+ def get_file_snapshot(self, args: ToolArgs) -> FileSnapshot | None:
+ """Return a snapshot of the file this tool is about to modify.
+
+ Called before ``run()`` so the checkpoint system can capture
+ the file's state *before* the tool writes to it.
+ Override in tools that modify files on disk.
+ """
+ return None
+
+ @staticmethod
+ def get_file_snapshot_for_path(path: str) -> FileSnapshot:
+ file_path = Path(path).expanduser()
+ if not file_path.is_absolute():
+ file_path = Path.cwd() / file_path
+ file_path = file_path.resolve()
+ try:
+ content: bytes | None = file_path.read_bytes()
+ except FileNotFoundError:
+ content = None
+ except Exception:
+ logger.warning("Failed to read file for tool snapshot: %s", file_path)
+ content = None
+ return FileSnapshot(path=str(file_path), content=content)
+
def get_result_extra(self, result: ToolResult) -> str | None:
"""Optional extra context appended to the result text sent to the LLM.
diff --git a/vibe/core/tools/builtins/search_replace.py b/vibe/core/tools/builtins/search_replace.py
index b797f4c..4238c3d 100644
--- a/vibe/core/tools/builtins/search_replace.py
+++ b/vibe/core/tools/builtins/search_replace.py
@@ -10,6 +10,7 @@ from typing import ClassVar, NamedTuple, final
import anyio
from pydantic import BaseModel, Field
+from vibe.core.rewind.manager import FileSnapshot
from vibe.core.tools.base import (
BaseTool,
BaseToolConfig,
@@ -110,6 +111,9 @@ class SearchReplace(
def get_status_text(cls) -> str:
return "Editing files"
+ def get_file_snapshot(self, args: SearchReplaceArgs) -> FileSnapshot | None:
+ return self.get_file_snapshot_for_path(args.file_path)
+
def resolve_permission(self, args: SearchReplaceArgs) -> PermissionContext | None:
return resolve_file_tool_permission(
args.file_path,
diff --git a/vibe/core/tools/builtins/write_file.py b/vibe/core/tools/builtins/write_file.py
index 99d3f85..e6a7c19 100644
--- a/vibe/core/tools/builtins/write_file.py
+++ b/vibe/core/tools/builtins/write_file.py
@@ -7,6 +7,7 @@ from typing import ClassVar, final
import anyio
from pydantic import BaseModel, Field
+from vibe.core.rewind.manager import FileSnapshot
from vibe.core.tools.base import (
BaseTool,
BaseToolConfig,
@@ -75,6 +76,9 @@ class WriteFile(
def get_status_text(cls) -> str:
return "Writing file"
+ def get_file_snapshot(self, args: WriteFileArgs) -> FileSnapshot | None:
+ return self.get_file_snapshot_for_path(args.path)
+
def resolve_permission(self, args: WriteFileArgs) -> PermissionContext | None:
return resolve_file_tool_permission(
args.path,
diff --git a/vibe/core/types.py b/vibe/core/types.py
index a57f2fc..72f8dbb 100644
--- a/vibe/core/types.py
+++ b/vibe/core/types.py
@@ -215,6 +215,7 @@ class LLMMessage(BaseModel):
role: Role
content: Content | None = None
+ injected: bool = False
reasoning_content: Content | None = None
reasoning_signature: str | None = None
tool_calls: list[ToolCall] | None = None
@@ -440,6 +441,7 @@ class MessageList(Sequence[LLMMessage]):
) -> None:
self._data: list[LLMMessage] = list(initial) if initial else []
self._observer = observer
+ self._reset_hooks: list[Callable[[], None]] = []
self._silent = False
if self._observer:
for msg in self._data:
@@ -460,9 +462,19 @@ class MessageList(Sequence[LLMMessage]):
for msg in msgs:
self.append(msg)
+ def on_reset(self, hook: Callable[[], None]) -> None:
+ """Register a callback that fires whenever the list is reset."""
+ self._reset_hooks.append(hook)
+
def reset(self, new: list[LLMMessage]) -> None:
"""Replace contents silently (never notifies)."""
self._data = list(new)
+ for hook in self._reset_hooks:
+ hook()
+
+ def update_system_prompt(self, new: str) -> None:
+ """Update the system prompt in place."""
+ self._data[0] = LLMMessage(role=Role.system, content=new)
@contextmanager
def silent(self) -> Iterator[None]:
diff --git a/vibe/whats_new.md b/vibe/whats_new.md
index 3157be0..7f0ddd2 100644
--- a/vibe/whats_new.md
+++ b/vibe/whats_new.md
@@ -1,4 +1,2 @@
-# What's new in v2.6.0
-- **Text-to-speech**: Added TTS functionality
-- **Standalone resume**: New --resume command for session picker
-- **Fine-grained permissions**: Improved permissions granularity and persistence
+# What's new in v2.7.0
+- **Rewind mode**: Added navigation and forking of conversation history with /rewind