mirror of
https://github.com/mistralai/mistral-vibe
synced 2026-04-25 17:14:55 +02:00
Co-authored-by: Clément Sirieix <clement.sirieix@mistral.ai> Co-authored-by: Vincent G <10739306+VinceOPS@users.noreply.github.com> Co-authored-by: Mistral Vibe <vibe@mistral.ai>
66 lines
2.1 KiB
Python
66 lines
2.1 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
from textual.reactive import reactive
|
|
from textual.timer import Timer
|
|
from textual.widgets import Static
|
|
|
|
from vibe.cli.narrator_manager.narrator_manager_port import (
|
|
NarratorManagerListener,
|
|
NarratorManagerPort,
|
|
NarratorState,
|
|
)
|
|
|
|
SHRINK_FRAMES = "█▇▆▅▄▃▂▁"
|
|
BAR_FRAMES = ["▂▅▇", "▃▆▅", "▅▃▇", "▇▂▅", "▅▇▃", "▃▅▆"]
|
|
ANIMATION_INTERVAL = 0.15
|
|
|
|
|
|
class NarratorStatus(NarratorManagerListener, Static):
|
|
state = reactive(NarratorState.IDLE)
|
|
|
|
def __init__(self, narrator_manager: NarratorManagerPort, **kwargs: Any) -> None:
|
|
super().__init__("", **kwargs)
|
|
self._narrator_manager = narrator_manager
|
|
self._timer: Timer | None = None
|
|
self._frame: int = 0
|
|
|
|
def on_mount(self) -> None:
|
|
self._narrator_manager.add_listener(self)
|
|
|
|
def on_unmount(self) -> None:
|
|
self._narrator_manager.remove_listener(self)
|
|
|
|
def on_narrator_state_change(self, state: NarratorState) -> None:
|
|
self.state = state
|
|
|
|
def watch_state(self, new_state: NarratorState) -> None:
|
|
self._stop_timer()
|
|
match new_state:
|
|
case NarratorState.IDLE:
|
|
self.update("")
|
|
case NarratorState.SUMMARIZING | NarratorState.SPEAKING:
|
|
self._frame = 0
|
|
self._tick()
|
|
self._timer = self.set_interval(ANIMATION_INTERVAL, self._tick)
|
|
|
|
def _tick(self) -> None:
|
|
match self.state:
|
|
case NarratorState.SUMMARIZING:
|
|
char = SHRINK_FRAMES[self._frame % len(SHRINK_FRAMES)]
|
|
self.update(
|
|
f"[bold orange]{char}[/bold orange] summarizing [dim]esc to stop[/dim]"
|
|
)
|
|
case NarratorState.SPEAKING:
|
|
bars = BAR_FRAMES[self._frame % len(BAR_FRAMES)]
|
|
self.update(
|
|
f"[bold orange]{bars}[/bold orange] speaking [dim]esc to stop[/dim]"
|
|
)
|
|
self._frame += 1
|
|
|
|
def _stop_timer(self) -> None:
|
|
if self._timer is not None:
|
|
self._timer.stop()
|
|
self._timer = None
|