cc-slim/src/cc_slim/main.py
2026-04-12 00:46:49 +08:00

191 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
from pathlib import Path
from typing import Optional
import typer
from rich.console import Console
from rich.table import Table
from cc_slim.commands import handle_command, parse_command
from cc_slim.engine import Agent, resolve_config
from cc_slim.memory import MemoryStore
from cc_slim.mode import ModeState
from cc_slim.permissions import PermissionChecker
from cc_slim.session import SessionStore
from cc_slim.tools import build_default_tools
app = typer.Typer(add_completion=False, no_args_is_help=False)
console = Console()
def render_stream(agent: Agent, user_input: str) -> None:
printed_text = False
for event in agent.stream_reply(user_input):
if event["type"] == "text":
console.print(event["content"], end="")
printed_text = True
elif event["type"] == "tool_call":
if printed_text:
console.print()
printed_text = False
console.print(f"[cyan]->[/cyan] {event['name']}({event['input']})")
elif event["type"] == "tool_result":
output = str(event.get("output", ""))
if (
output.startswith("当前处于 plan 模式,禁止执行工具:")
or output.startswith("当前未批准工具执行:")
or output.startswith("当前命令疑似指向工作区外路径,已拒绝执行 Bash")
):
console.print(f"[red]{output}[/red]")
else:
console.print(f"[green]✓[/green] {event['name']} done")
elif event["type"] == "error":
if printed_text:
console.print()
console.print(f"[red]error:[/red] {event['message']}")
return
elif event["type"] == "done":
console.print()
return
def render_history(store: SessionStore) -> None:
sessions = store.list_sessions()
if not sessions:
console.print("当前工作目录没有历史 session。")
return
table = Table(show_header=True)
table.add_column("#")
table.add_column("session_id")
table.add_column("title")
table.add_column("updated_at")
table.add_column("count")
for index, item in enumerate(sessions, start=1):
table.add_row(
str(index),
str(item.get("session_id", "")),
str(item.get("title", "")),
str(item.get("updated_at", "")),
str(item.get("message_count", 0)),
)
console.print(table)
def build_agent(
root: Path,
config: object,
store: SessionStore,
permissions: PermissionChecker,
session_meta: dict[str, object],
restored_history: list[dict[str, object]] | None = None,
) -> Agent:
return Agent(
config=config,
tools=build_default_tools(root),
workspace=root,
session_store=store,
session_id=str(session_meta["session_id"]),
history=restored_history or [],
permission_checker=permissions,
confirm_tool=confirm_tool,
)
def confirm_tool(name: str, payload: dict[str, object]) -> bool:
console.print(f"即将执行 [{name}] {payload},是否允许? [y/N]", markup=False, end=" ")
answer = input()
return answer.strip().lower() in {"y", "yes"}
@app.command()
def run(
prompt: Optional[str] = typer.Argument(None, help="单次执行的用户输入"),
provider: Optional[str] = typer.Option(None, help="模型提供方openai 或 anthropic"),
model: Optional[str] = typer.Option(None, help="模型名称"),
api_key: Optional[str] = typer.Option(None, help="API Key优先级最高"),
base_url: Optional[str] = typer.Option(None, help="可选的 API Base URL"),
cwd: Path = typer.Option(Path("."), help="工作区根目录"),
max_turns: Optional[int] = typer.Option(None, help="最大工具循环轮数"),
history: bool = typer.Option(False, "--history", help="列出当前工作目录的历史 session"),
resume: Optional[str] = typer.Option(None, "--resume", help="按 session id 或序号恢复历史 session"),
auto_approve: bool = typer.Option(False, "--auto-approve", help="跳过高风险工具确认"),
) -> None:
root = cwd.resolve()
store = SessionStore(root)
memory = MemoryStore(root)
mode = ModeState()
permissions = PermissionChecker(root, auto_approve=auto_approve)
permissions.set_mode(mode.mode)
if history:
render_history(store)
return
config = resolve_config(
root,
{
"provider": provider,
"model": model,
"api_key": api_key,
"base_url": base_url,
"max_turns": max_turns,
},
)
session_meta: dict[str, object]
restored_history: list[dict[str, object]] = []
if resume:
session_id = store.resolve_session_id(resume)
session_meta = store.load_meta(session_id)
restored_history = store.load_messages(session_id)
console.print(f"已恢复 session: {session_meta.get('session_id')}")
else:
session_meta = store.create_session(config.model)
agent = build_agent(root, config, store, permissions, session_meta, restored_history)
if prompt:
render_stream(agent, prompt)
return
console.print("[bold cyan]cc-slim[/bold cyan] REPL输入 /help 查看命令,输入 exit 或 quit 退出。")
while True:
try:
user_input = typer.prompt(f"[{mode.mode}] >")
except (EOFError, KeyboardInterrupt):
console.print()
break
if user_input.strip().lower() in {"exit", "quit"}:
break
if not user_input.strip():
continue
if parse_command(user_input):
try:
agent = handle_command(
user_input,
console=console,
root=root,
config=config,
store=store,
memory=memory,
mode=mode,
permissions=permissions,
agent=agent,
build_agent=build_agent,
render_history=render_history,
)
except Exception as exc: # pragma: no cover
console.print(f"[red]error:[/red] {exc}")
continue
try:
render_stream(agent, user_input)
except Exception as exc: # pragma: no cover
console.print(f"[red]error:[/red] {exc}")
if __name__ == "__main__":
app()