Compare commits
8 Commits
54191f8458
...
e9cf2e3533
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e9cf2e3533 | ||
|
|
192e6a56da | ||
|
|
c9b545538e | ||
|
|
04ebf6bb2f | ||
|
|
103725e652 | ||
|
|
67f6467da0 | ||
|
|
587331cbb8 | ||
|
|
1d66019529 |
2
.gitignore
vendored
2
.gitignore
vendored
@ -14,7 +14,7 @@ venv/
|
||||
*.log
|
||||
logs/
|
||||
.tmp/
|
||||
|
||||
test/
|
||||
# IDE
|
||||
.vscode/
|
||||
.idea/
|
||||
|
||||
24
AGENTS.md
24
AGENTS.md
@ -21,6 +21,16 @@
|
||||
4. 使用工具后重新判断结果。
|
||||
5. 持续循环,直到得到最终答案或出现必须由用户补充的信息。
|
||||
|
||||
当前 harness 是极简实现,优先最小动作,不做不必要的重复试错。
|
||||
|
||||
行动时必须以运行时注入的环境信息为准,特别是平台、shell、工作目录和可用工具列表。
|
||||
|
||||
REPL 内的会话管理命令优先使用 slash command。
|
||||
|
||||
`/history`、`/resume`、`/new` 由程序直接处理,不进入 agent loop。
|
||||
|
||||
当前项目支持最小 memory:使用 `/remember` 保存长期信息,使用 `/memory` 查看。
|
||||
|
||||
未明确说明时,使用以下默认值:
|
||||
|
||||
- 工作目录:当前进程目录
|
||||
@ -33,8 +43,18 @@
|
||||
## 工具使用规则
|
||||
|
||||
- `Read`:用于读取文件内容。
|
||||
- `Glob`:用于按模式查找文件。
|
||||
- 需要按文件名或路径模式查找时,优先使用 `Glob`。
|
||||
- 需要搜索文件内容时,优先使用 `Grep`。
|
||||
- 修改已有文件内容时,优先使用 `Edit` 工具。
|
||||
- 创建新文件时,优先使用 `Write` 工具。
|
||||
- `Bash`:用于执行必须通过 shell 完成的最小命令。
|
||||
- 只有在确实需要复杂 shell 特性时才使用 Bash。
|
||||
- 不要用 Bash 拼接文件内容。
|
||||
- Windows 环境下优先使用兼容写法,不默认使用 `cat <<EOF`、`ls -la` 等 Unix 风格写法。
|
||||
- 同一写入或创建目标,最多尝试 2 种不同 Bash 方案。
|
||||
- 如果两次 Bash 方案失败,应立即收敛:
|
||||
- 先检查路径、文件状态、shell 兼容性。
|
||||
- 如仍不确定,则询问用户,而不是继续盲试。
|
||||
- 不能虚构工具输出。
|
||||
- 不能在未验证前声称文件存在、命令成功或修改已生效。
|
||||
|
||||
@ -42,5 +62,5 @@
|
||||
|
||||
- 对不确定性保持诚实。
|
||||
- 需要时引用具体文件或命令。
|
||||
- 默认保持简短,除非用户要求展开。
|
||||
- 输出保持简短直接,不夸大成功状态。
|
||||
- 若受阻,只询问当前缺失的关键信息。
|
||||
|
||||
@ -20,6 +20,12 @@
|
||||
- 重构时:
|
||||
- 先找全部用法
|
||||
- 避免修改无关文件
|
||||
- 搜索文件内容时,优先使用 `Grep`,而不是 `Bash`
|
||||
- 修改已有文件时,优先使用 `Edit`,而不是 `Bash` 或 `Write`
|
||||
- 需要创建或覆盖文件时,优先使用 `Write` 工具,而不是 `Bash`
|
||||
- 会话管理优先使用 slash command,而不是自然语言或 `Bash` 探查 session 文件
|
||||
- 长期有价值的项目约束或偏好,优先使用 `/remember` 保存
|
||||
- Windows 下先验证 shell 兼容性,再选择命令写法
|
||||
|
||||
## 沟通风格
|
||||
|
||||
|
||||
@ -24,10 +24,13 @@ packages = ["src/cc_slim"]
|
||||
[tool.cc_slim]
|
||||
provider = "openai"
|
||||
model = "gpt-4.1-mini"
|
||||
max_turns = 8
|
||||
max_turns = 12
|
||||
|
||||
[dependency-groups]
|
||||
dev = [
|
||||
"pytest>=9.0.3",
|
||||
"ruff>=0.15.10",
|
||||
]
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
testpaths = ["tests"]
|
||||
|
||||
@ -2,14 +2,19 @@ from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import platform
|
||||
import sys
|
||||
import tomllib
|
||||
from dataclasses import dataclass
|
||||
from json import JSONDecodeError
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from typing import Any, Iterator
|
||||
|
||||
from anthropic import Anthropic
|
||||
from openai import OpenAI
|
||||
|
||||
from cc_slim.memory import MemoryStore
|
||||
from cc_slim.session import SessionStore
|
||||
from cc_slim.tools import Tool
|
||||
|
||||
|
||||
@ -19,7 +24,7 @@ class Config:
|
||||
model: str
|
||||
api_key: str
|
||||
base_url: str | None
|
||||
max_turns: int = 8
|
||||
max_turns: int = 12
|
||||
|
||||
|
||||
def resolve_config(workspace: Path, cli: dict[str, Any]) -> Config:
|
||||
@ -39,7 +44,7 @@ def resolve_config(workspace: Path, cli: dict[str, Any]) -> Config:
|
||||
"",
|
||||
)
|
||||
base_url = _pick(cli.get("base_url"), os.getenv("CC_SLIM_BASE_URL"), file_cfg.get("base_url"), None)
|
||||
max_turns_raw = _pick(cli.get("max_turns"), os.getenv("CC_SLIM_MAX_TURNS"), file_cfg.get("max_turns"), 8)
|
||||
max_turns_raw = _pick(cli.get("max_turns"), os.getenv("CC_SLIM_MAX_TURNS"), file_cfg.get("max_turns"), 12)
|
||||
|
||||
if not api_key:
|
||||
raise ValueError("缺少 API key,请通过 CLI、环境变量或 .cc-slim.toml 提供。")
|
||||
@ -54,25 +59,58 @@ def resolve_config(workspace: Path, cli: dict[str, Any]) -> Config:
|
||||
|
||||
|
||||
class Agent:
|
||||
def __init__(self, config: Config, tools: list[Tool], workspace: Path) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
config: Config,
|
||||
tools: list[Tool],
|
||||
workspace: Path,
|
||||
session_store: SessionStore | None = None,
|
||||
session_id: str | None = None,
|
||||
history: list[dict[str, Any]] | None = None,
|
||||
) -> None:
|
||||
self.config = config
|
||||
self.tools = {tool.name: tool for tool in tools}
|
||||
self.history: list[dict[str, Any]] = []
|
||||
self.history: list[dict[str, Any]] = list(history or [])
|
||||
self.session_store = session_store
|
||||
self.session_id = session_id
|
||||
self.system_prompt = self._build_system_prompt(workspace)
|
||||
self.client = self._build_client()
|
||||
|
||||
def reply(self, user_input: str) -> str:
|
||||
self.history.append({"role": "user", "content": user_input})
|
||||
parts: list[str] = []
|
||||
for event in self.stream_reply(user_input):
|
||||
if event["type"] == "text":
|
||||
parts.append(event["content"])
|
||||
elif event["type"] == "error":
|
||||
raise RuntimeError(event["message"])
|
||||
return "".join(parts).strip() or "(empty response)"
|
||||
|
||||
def stream_reply(self, user_input: str) -> Iterator[dict[str, Any]]:
|
||||
user_message = {"role": "user", "content": user_input}
|
||||
self.history.append(user_message)
|
||||
self._save_message(user_message)
|
||||
|
||||
for _ in range(self.config.max_turns):
|
||||
result = self._call_model()
|
||||
try:
|
||||
if self.config.provider == "openai":
|
||||
result = yield from self._stream_openai()
|
||||
else:
|
||||
result = yield from self._stream_anthropic()
|
||||
except Exception as exc:
|
||||
yield {"type": "error", "message": str(exc)}
|
||||
return
|
||||
|
||||
self.history.append(result["assistant"])
|
||||
self._save_message(result["assistant"])
|
||||
|
||||
if not result["tool_calls"]:
|
||||
return result["text"].strip() or "(empty response)"
|
||||
yield {"type": "done"}
|
||||
return
|
||||
|
||||
for call in result["tool_calls"]:
|
||||
yield {"type": "tool_call", "name": call["name"], "input": call["input"]}
|
||||
tool_output = self._run_tool(call["name"], call["input"])
|
||||
yield {"type": "tool_result", "name": call["name"], "output": tool_output}
|
||||
self.history.append(
|
||||
{
|
||||
"role": "tool",
|
||||
@ -81,8 +119,9 @@ class Agent:
|
||||
"content": tool_output,
|
||||
}
|
||||
)
|
||||
self._save_message(self.history[-1])
|
||||
|
||||
return "已达到最大工具循环轮数,停止执行。"
|
||||
yield {"type": "error", "message": "已达到最大工具循环轮数,停止执行。"}
|
||||
|
||||
def _build_client(self) -> Any:
|
||||
if self.config.provider == "openai":
|
||||
@ -99,6 +138,8 @@ class Agent:
|
||||
|
||||
def _build_system_prompt(self, workspace: Path) -> str:
|
||||
parts: list[str] = []
|
||||
parts.append(self._build_runtime_summary(workspace))
|
||||
|
||||
agents = workspace / "AGENTS.md"
|
||||
if agents.exists():
|
||||
parts.append(agents.read_text(encoding="utf-8"))
|
||||
@ -108,12 +149,31 @@ class Agent:
|
||||
for path in sorted(skills_dir.glob("*.md"), key=lambda p: p.name):
|
||||
parts.append(path.read_text(encoding="utf-8"))
|
||||
|
||||
memory = MemoryStore(workspace).read()
|
||||
if memory:
|
||||
parts.append(f"# Memory\n\n{memory}")
|
||||
|
||||
return "\n\n".join(part.strip() for part in parts if part.strip())
|
||||
|
||||
def _call_model(self) -> dict[str, Any]:
|
||||
if self.config.provider == "openai":
|
||||
return self._call_openai()
|
||||
return self._call_anthropic()
|
||||
def _build_runtime_summary(self, workspace: Path) -> str:
|
||||
tool_names = ", ".join(self.tools.keys()) or "(none)"
|
||||
shell_name = self._detect_shell()
|
||||
return "\n".join(
|
||||
[
|
||||
"## 运行环境",
|
||||
f"- 平台: {platform.system() or 'Unknown'}",
|
||||
f"- sys.platform: {sys.platform}",
|
||||
f"- shell: {shell_name}",
|
||||
f"- workspace: {workspace}",
|
||||
f"- 可用工具: {tool_names}",
|
||||
"- 行动时必须以以上运行环境信息为准,不要默认套用 Unix/Linux 命令习惯。",
|
||||
]
|
||||
)
|
||||
|
||||
def _detect_shell(self) -> str:
|
||||
if os.name == "nt":
|
||||
return os.getenv("COMSPEC", "Windows shell (likely PowerShell or cmd.exe)")
|
||||
return os.getenv("SHELL", "unknown shell")
|
||||
|
||||
def _call_openai(self) -> dict[str, Any]:
|
||||
response = self.client.chat.completions.create(
|
||||
@ -160,6 +220,51 @@ class Agent:
|
||||
assistant = {"role": "assistant", "content": content_blocks, "tool_calls": tool_calls}
|
||||
return {"assistant": assistant, "tool_calls": tool_calls, "text": "\n".join(text_parts)}
|
||||
|
||||
def _stream_openai(self) -> Iterator[dict[str, Any]]:
|
||||
stream = self.client.chat.completions.create(
|
||||
model=self.config.model,
|
||||
messages=self._openai_messages(),
|
||||
tools=self._openai_tools(),
|
||||
tool_choice="auto",
|
||||
stream=True,
|
||||
)
|
||||
|
||||
text_parts: list[str] = []
|
||||
tool_call_parts: dict[int, dict[str, Any]] = {}
|
||||
|
||||
for chunk in stream:
|
||||
choice = chunk.choices[0] if chunk.choices else None
|
||||
if choice is None:
|
||||
continue
|
||||
|
||||
delta = choice.delta
|
||||
if delta.content:
|
||||
text_parts.append(delta.content)
|
||||
yield {"type": "text", "content": delta.content}
|
||||
|
||||
for tool_delta in delta.tool_calls or []:
|
||||
slot = tool_call_parts.setdefault(
|
||||
tool_delta.index,
|
||||
{"id": "", "name": "", "arguments": ""},
|
||||
)
|
||||
if tool_delta.id:
|
||||
slot["id"] = tool_delta.id
|
||||
if tool_delta.function and tool_delta.function.name:
|
||||
slot["name"] = tool_delta.function.name
|
||||
if tool_delta.function and tool_delta.function.arguments:
|
||||
slot["arguments"] += tool_delta.function.arguments
|
||||
|
||||
tool_calls = [self._finalize_openai_tool_call(part) for _, part in sorted(tool_call_parts.items())]
|
||||
text = "".join(text_parts)
|
||||
assistant = {"role": "assistant", "content": text, "tool_calls": tool_calls}
|
||||
return {"assistant": assistant, "tool_calls": tool_calls, "text": text}
|
||||
|
||||
def _stream_anthropic(self) -> Iterator[dict[str, Any]]:
|
||||
result = self._call_anthropic()
|
||||
if result["text"]:
|
||||
yield {"type": "text", "content": result["text"]}
|
||||
return result
|
||||
|
||||
def _run_tool(self, name: str, payload: dict[str, Any]) -> str:
|
||||
tool = self.tools.get(name)
|
||||
if not tool:
|
||||
@ -248,6 +353,22 @@ class Agent:
|
||||
)
|
||||
return messages
|
||||
|
||||
def _finalize_openai_tool_call(self, part: dict[str, Any]) -> dict[str, Any]:
|
||||
arguments = part.get("arguments", "") or "{}"
|
||||
try:
|
||||
payload = json.loads(arguments)
|
||||
except JSONDecodeError as exc:
|
||||
raise ValueError(f"工具参数解析失败: {exc}") from exc
|
||||
return {
|
||||
"id": part.get("id") or f"call_{part.get('name', 'tool')}",
|
||||
"name": part.get("name") or "",
|
||||
"input": payload,
|
||||
}
|
||||
|
||||
def _save_message(self, message: dict[str, Any]) -> None:
|
||||
if self.session_store and self.session_id:
|
||||
self.session_store.append_message(self.session_id, message)
|
||||
|
||||
|
||||
def _load_file_config(path: Path) -> dict[str, Any]:
|
||||
if not path.exists():
|
||||
|
||||
@ -5,14 +5,126 @@ from typing import Optional
|
||||
|
||||
import typer
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
|
||||
from cc_slim.engine import Agent, resolve_config
|
||||
from cc_slim.memory import MemoryStore
|
||||
from cc_slim.session import SessionStore
|
||||
from cc_slim.tools import build_default_tools
|
||||
|
||||
app = typer.Typer(add_completion=False, no_args_is_help=False)
|
||||
console = Console()
|
||||
|
||||
|
||||
def render_stream(agent: Agent, user_input: str) -> None:
|
||||
printed_text = False
|
||||
for event in agent.stream_reply(user_input):
|
||||
if event["type"] == "text":
|
||||
console.print(event["content"], end="")
|
||||
printed_text = True
|
||||
elif event["type"] == "tool_call":
|
||||
if printed_text:
|
||||
console.print()
|
||||
printed_text = False
|
||||
console.print(f"[cyan]->[/cyan] {event['name']}({event['input']})")
|
||||
elif event["type"] == "tool_result":
|
||||
console.print(f"[green]✓[/green] {event['name']} done")
|
||||
elif event["type"] == "error":
|
||||
if printed_text:
|
||||
console.print()
|
||||
console.print(f"[red]error:[/red] {event['message']}")
|
||||
return
|
||||
elif event["type"] == "done":
|
||||
console.print()
|
||||
return
|
||||
|
||||
|
||||
def render_history(store: SessionStore) -> None:
|
||||
sessions = store.list_sessions()
|
||||
if not sessions:
|
||||
console.print("当前工作目录没有历史 session。")
|
||||
return
|
||||
|
||||
table = Table(show_header=True)
|
||||
table.add_column("#")
|
||||
table.add_column("session_id")
|
||||
table.add_column("title")
|
||||
table.add_column("updated_at")
|
||||
table.add_column("count")
|
||||
for index, item in enumerate(sessions, start=1):
|
||||
table.add_row(
|
||||
str(index),
|
||||
str(item.get("session_id", "")),
|
||||
str(item.get("title", "")),
|
||||
str(item.get("updated_at", "")),
|
||||
str(item.get("message_count", 0)),
|
||||
)
|
||||
console.print(table)
|
||||
|
||||
|
||||
def build_agent(
|
||||
root: Path,
|
||||
config: object,
|
||||
store: SessionStore,
|
||||
session_meta: dict[str, object],
|
||||
restored_history: list[dict[str, object]] | None = None,
|
||||
) -> Agent:
|
||||
return Agent(
|
||||
config=config,
|
||||
tools=build_default_tools(root),
|
||||
workspace=root,
|
||||
session_store=store,
|
||||
session_id=str(session_meta["session_id"]),
|
||||
history=restored_history or [],
|
||||
)
|
||||
|
||||
|
||||
def handle_repl_command(
|
||||
user_input: str,
|
||||
store: SessionStore,
|
||||
memory: MemoryStore,
|
||||
config: object,
|
||||
root: Path,
|
||||
agent: Agent,
|
||||
) -> Agent:
|
||||
command = user_input.strip()
|
||||
if command in {"--history", "/history"}:
|
||||
render_history(store)
|
||||
return agent
|
||||
|
||||
if command == "/new":
|
||||
session_meta = store.create_session(config.model)
|
||||
console.print(f"已创建新 session: {session_meta.get('session_id')}")
|
||||
return build_agent(root, config, store, session_meta, [])
|
||||
|
||||
if command == "/memory":
|
||||
content = memory.read()
|
||||
console.print(content or "当前项目还没有 memory。")
|
||||
return agent
|
||||
|
||||
if command.startswith("/remember "):
|
||||
text = user_input.split(" ", 1)[1].strip()
|
||||
if not text:
|
||||
console.print("[red]error:[/red] 缺少需要保存的 memory 内容")
|
||||
return agent
|
||||
path = memory.append(text)
|
||||
console.print(f"已写入 memory: {path}")
|
||||
return agent
|
||||
|
||||
if command.startswith("--resume ") or command.startswith("/resume "):
|
||||
target = command.split(maxsplit=1)[1].strip()
|
||||
if not target:
|
||||
console.print("[red]error:[/red] 缺少 resume 目标")
|
||||
return agent
|
||||
session_id = store.resolve_session_id(target)
|
||||
session_meta = store.load_meta(session_id)
|
||||
restored_history = store.load_messages(session_id)
|
||||
console.print(f"已恢复 session: {session_meta.get('session_id')}")
|
||||
return build_agent(root, config, store, session_meta, restored_history)
|
||||
|
||||
return agent
|
||||
|
||||
|
||||
@app.command()
|
||||
def run(
|
||||
prompt: Optional[str] = typer.Argument(None, help="单次执行的用户输入"),
|
||||
@ -22,8 +134,16 @@ def run(
|
||||
base_url: Optional[str] = typer.Option(None, help="可选的 API Base URL"),
|
||||
cwd: Path = typer.Option(Path("."), help="工作区根目录"),
|
||||
max_turns: Optional[int] = typer.Option(None, help="最大工具循环轮数"),
|
||||
history: bool = typer.Option(False, "--history", help="列出当前工作目录的历史 session"),
|
||||
resume: Optional[str] = typer.Option(None, "--resume", help="按 session id 或序号恢复历史 session"),
|
||||
) -> None:
|
||||
root = cwd.resolve()
|
||||
store = SessionStore(root)
|
||||
memory = MemoryStore(root)
|
||||
if history:
|
||||
render_history(store)
|
||||
return
|
||||
|
||||
config = resolve_config(
|
||||
root,
|
||||
{
|
||||
@ -34,10 +154,20 @@ def run(
|
||||
"max_turns": max_turns,
|
||||
},
|
||||
)
|
||||
agent = Agent(config=config, tools=build_default_tools(root), workspace=root)
|
||||
session_meta: dict[str, object]
|
||||
restored_history: list[dict[str, object]] = []
|
||||
if resume:
|
||||
session_id = store.resolve_session_id(resume)
|
||||
session_meta = store.load_meta(session_id)
|
||||
restored_history = store.load_messages(session_id)
|
||||
console.print(f"已恢复 session: {session_meta.get('session_id')}")
|
||||
else:
|
||||
session_meta = store.create_session(config.model)
|
||||
|
||||
agent = build_agent(root, config, store, session_meta, restored_history)
|
||||
|
||||
if prompt:
|
||||
console.print(agent.reply(prompt))
|
||||
render_stream(agent, prompt)
|
||||
return
|
||||
|
||||
console.print("[bold cyan]cc-slim[/bold cyan] REPL,输入 exit 或 quit 退出。")
|
||||
@ -53,8 +183,15 @@ def run(
|
||||
if not user_input.strip():
|
||||
continue
|
||||
|
||||
if user_input.strip().startswith("/") or user_input.strip().startswith("--history") or user_input.strip().startswith("--resume"):
|
||||
try:
|
||||
console.print(agent.reply(user_input))
|
||||
agent = handle_repl_command(user_input, store, memory, config, root, agent)
|
||||
except Exception as exc: # pragma: no cover
|
||||
console.print(f"[red]error:[/red] {exc}")
|
||||
continue
|
||||
|
||||
try:
|
||||
render_stream(agent, user_input)
|
||||
except Exception as exc: # pragma: no cover
|
||||
console.print(f"[red]error:[/red] {exc}")
|
||||
|
||||
|
||||
33
src/cc_slim/memory.py
Normal file
33
src/cc_slim/memory.py
Normal file
@ -0,0 +1,33 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class MemoryStore:
|
||||
def __init__(self, cwd: Path) -> None:
|
||||
self.cwd = cwd.resolve()
|
||||
self.root = Path.home() / ".config" / "cc-slim" / "memory" / self._sanitize_cwd(self.cwd)
|
||||
self.root.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def append(self, text: str) -> Path:
|
||||
path = self.path()
|
||||
existing = self.read()
|
||||
content = f"{existing}\n\n{text.strip()}" if existing else text.strip()
|
||||
path.write_text(content + "\n", encoding="utf-8")
|
||||
return path
|
||||
|
||||
def read(self) -> str:
|
||||
path = self.path()
|
||||
if not path.exists():
|
||||
return ""
|
||||
return path.read_text(encoding="utf-8").strip()
|
||||
|
||||
def path(self) -> Path:
|
||||
return self.root / "MEMORY.md"
|
||||
|
||||
def _sanitize_cwd(self, cwd: Path) -> str:
|
||||
text = re.sub(r"[^A-Za-z0-9._-]+", "_", str(cwd))
|
||||
digest = hashlib.sha1(str(cwd).encode("utf-8")).hexdigest()[:8]
|
||||
return f"{text[:48]}-{digest}"
|
||||
95
src/cc_slim/session.py
Normal file
95
src/cc_slim/session.py
Normal file
@ -0,0 +1,95 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import re
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
class SessionStore:
|
||||
def __init__(self, cwd: Path) -> None:
|
||||
self.cwd = cwd.resolve()
|
||||
self.root = Path.home() / ".config" / "cc-slim" / "sessions" / self._sanitize_cwd(self.cwd)
|
||||
self.root.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def create_session(self, model: str) -> dict[str, Any]:
|
||||
now = datetime.now().strftime("%Y%m%d-%H%M%S")
|
||||
cwd_hash = hashlib.sha1(str(self.cwd).encode("utf-8")).hexdigest()[:8]
|
||||
session_id = f"{now}-{cwd_hash}"
|
||||
meta = {
|
||||
"session_id": session_id,
|
||||
"title": "",
|
||||
"cwd": str(self.cwd),
|
||||
"model": model,
|
||||
"created_at": datetime.now().isoformat(timespec="seconds"),
|
||||
"updated_at": datetime.now().isoformat(timespec="seconds"),
|
||||
"message_count": 0,
|
||||
}
|
||||
self._write_meta(session_id, meta)
|
||||
self._messages_path(session_id).write_text("", encoding="utf-8")
|
||||
return meta
|
||||
|
||||
def append_message(self, session_id: str, message: dict[str, Any]) -> None:
|
||||
with self._messages_path(session_id).open("a", encoding="utf-8") as fh:
|
||||
fh.write(json.dumps(message, ensure_ascii=False) + "\n")
|
||||
|
||||
meta = self.load_meta(session_id)
|
||||
meta["message_count"] = int(meta.get("message_count", 0)) + 1
|
||||
meta["updated_at"] = datetime.now().isoformat(timespec="seconds")
|
||||
if not meta.get("title") and message.get("role") == "user":
|
||||
meta["title"] = self._make_title(str(message.get("content", "")))
|
||||
self._write_meta(session_id, meta)
|
||||
|
||||
def list_sessions(self) -> list[dict[str, Any]]:
|
||||
sessions: list[dict[str, Any]] = []
|
||||
for path in self.root.glob("*.meta.json"):
|
||||
sessions.append(json.loads(path.read_text(encoding="utf-8")))
|
||||
sessions.sort(key=lambda item: item.get("updated_at", ""), reverse=True)
|
||||
return sessions
|
||||
|
||||
def load_messages(self, session_id: str) -> list[dict[str, Any]]:
|
||||
path = self._messages_path(session_id)
|
||||
if not path.exists():
|
||||
return []
|
||||
messages: list[dict[str, Any]] = []
|
||||
for line in path.read_text(encoding="utf-8").splitlines():
|
||||
if line.strip():
|
||||
messages.append(json.loads(line))
|
||||
return messages
|
||||
|
||||
def load_meta(self, session_id: str) -> dict[str, Any]:
|
||||
return json.loads(self._meta_path(session_id).read_text(encoding="utf-8"))
|
||||
|
||||
def resolve_session_id(self, value: str) -> str:
|
||||
sessions = self.list_sessions()
|
||||
if value.isdigit():
|
||||
index = int(value) - 1
|
||||
if 0 <= index < len(sessions):
|
||||
return str(sessions[index]["session_id"])
|
||||
raise ValueError(f"session index 不存在: {value}")
|
||||
|
||||
for item in sessions:
|
||||
session_id = str(item.get("session_id", ""))
|
||||
if session_id == value or session_id.startswith(value):
|
||||
return session_id
|
||||
raise ValueError(f"session 不存在: {value}")
|
||||
|
||||
def _messages_path(self, session_id: str) -> Path:
|
||||
return self.root / f"{session_id}.jsonl"
|
||||
|
||||
def _meta_path(self, session_id: str) -> Path:
|
||||
return self.root / f"{session_id}.meta.json"
|
||||
|
||||
def _write_meta(self, session_id: str, meta: dict[str, Any]) -> None:
|
||||
self._meta_path(session_id).write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
|
||||
|
||||
def _sanitize_cwd(self, cwd: Path) -> str:
|
||||
text = re.sub(r"[^A-Za-z0-9._-]+", "_", str(cwd))
|
||||
digest = hashlib.sha1(str(cwd).encode("utf-8")).hexdigest()[:8]
|
||||
return f"{text[:48]}-{digest}"
|
||||
|
||||
def _make_title(self, text: str) -> str:
|
||||
compact = " ".join(text.strip().split()) or "新会话"
|
||||
return compact[:48]
|
||||
@ -43,6 +43,50 @@ def build_default_tools(workspace: Path) -> list[Tool]:
|
||||
},
|
||||
execute=lambda data: glob_tool(workspace, data),
|
||||
),
|
||||
Tool(
|
||||
name="Edit",
|
||||
description="覆盖已有文本文件的完整内容。",
|
||||
input_schema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"path": {"type": "string", "description": "相对工作区的文件路径"},
|
||||
"content": {"type": "string", "description": "替换后的完整文件内容"},
|
||||
"encoding": {"type": "string", "description": "文件编码,默认 utf-8"},
|
||||
},
|
||||
"required": ["path", "content"],
|
||||
"additionalProperties": False,
|
||||
},
|
||||
execute=lambda data: edit_tool(workspace, data),
|
||||
),
|
||||
Tool(
|
||||
name="Write",
|
||||
description="创建或覆盖工作区内的文本文件。",
|
||||
input_schema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"path": {"type": "string", "description": "相对工作区的文件路径"},
|
||||
"content": {"type": "string", "description": "要写入的完整文本内容"},
|
||||
"encoding": {"type": "string", "description": "文件编码,默认 utf-8"},
|
||||
},
|
||||
"required": ["path", "content"],
|
||||
"additionalProperties": False,
|
||||
},
|
||||
execute=lambda data: write_tool(workspace, data),
|
||||
),
|
||||
Tool(
|
||||
name="Grep",
|
||||
description="在文件或目录中搜索普通文本内容。",
|
||||
input_schema={
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"pattern": {"type": "string", "description": "要搜索的普通文本字符串"},
|
||||
"path": {"type": "string", "description": "可选,仅支持文件路径或目录路径,默认当前工作区"},
|
||||
},
|
||||
"required": ["pattern"],
|
||||
"additionalProperties": False,
|
||||
},
|
||||
execute=lambda data: grep_tool(workspace, data),
|
||||
),
|
||||
Tool(
|
||||
name="Bash",
|
||||
description="在工作区中执行一条 shell 命令。",
|
||||
@ -82,6 +126,60 @@ def glob_tool(workspace: Path, data: dict[str, Any]) -> str:
|
||||
return "\n".join(matches[:200])
|
||||
|
||||
|
||||
def edit_tool(workspace: Path, data: dict[str, Any]) -> str:
|
||||
path = _safe_path(workspace, str(data["path"]))
|
||||
if not path.exists():
|
||||
return f"文件不存在: {path.relative_to(workspace)}"
|
||||
if path.is_dir():
|
||||
return f"目标是目录,不能编辑文件: {path.relative_to(workspace)}"
|
||||
|
||||
encoding = str(data.get("encoding") or "utf-8")
|
||||
path.write_text(str(data["content"]), encoding=encoding)
|
||||
return f"已修改文件: {path.relative_to(workspace)}"
|
||||
|
||||
|
||||
def write_tool(workspace: Path, data: dict[str, Any]) -> str:
|
||||
path = _safe_path(workspace, str(data["path"]))
|
||||
if path.exists() and path.is_dir():
|
||||
return f"目标是目录,不能写入文件: {path.relative_to(workspace)}"
|
||||
if not path.parent.exists():
|
||||
return f"父目录不存在: {path.parent.relative_to(workspace)}"
|
||||
|
||||
existed = path.exists()
|
||||
encoding = str(data.get("encoding") or "utf-8")
|
||||
path.write_text(str(data["content"]), encoding=encoding)
|
||||
action = "已覆盖" if existed else "已创建"
|
||||
return f"{action}文件: {path.relative_to(workspace)}"
|
||||
|
||||
|
||||
def grep_tool(workspace: Path, data: dict[str, Any]) -> str:
|
||||
pattern = str(data["pattern"])
|
||||
target = _safe_path(workspace, str(data.get("path") or "."))
|
||||
if not target.exists():
|
||||
return f"文件不存在: {target.relative_to(workspace)}"
|
||||
|
||||
results: list[str] = []
|
||||
files = [target] if target.is_file() else [path for path in target.rglob("*") if path.is_file()]
|
||||
if target.is_dir() and not files:
|
||||
return f"目录为空: {target.relative_to(workspace)}"
|
||||
|
||||
for file_path in files:
|
||||
try:
|
||||
text = file_path.read_text(encoding="utf-8", errors="replace")
|
||||
except OSError:
|
||||
continue
|
||||
|
||||
for line_no, line in enumerate(text.splitlines(), start=1):
|
||||
if pattern in line:
|
||||
results.append(f"{file_path.relative_to(workspace)}:{line_no}: {line}")
|
||||
if len(results) >= 20:
|
||||
return "\n".join(results)
|
||||
|
||||
if not results:
|
||||
return "未找到匹配内容"
|
||||
return "\n".join(results)
|
||||
|
||||
|
||||
def bash_tool(workspace: Path, data: dict[str, Any]) -> str:
|
||||
command = str(data["command"])
|
||||
proc = subprocess.run(
|
||||
@ -90,6 +188,8 @@ def bash_tool(workspace: Path, data: dict[str, Any]) -> str:
|
||||
shell=True,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
encoding="utf-8",
|
||||
errors="replace",
|
||||
timeout=60,
|
||||
)
|
||||
payload = {
|
||||
|
||||
67
tests/test_config.py
Normal file
67
tests/test_config.py
Normal file
@ -0,0 +1,67 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
|
||||
|
||||
from cc_slim.engine import resolve_config
|
||||
|
||||
|
||||
def test_resolve_config_uses_defaults(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
|
||||
monkeypatch.setenv("OPENAI_API_KEY", "env-openai-key")
|
||||
|
||||
config = resolve_config(tmp_path, {})
|
||||
|
||||
assert config.provider == "openai"
|
||||
assert config.model == "gpt-4.1-mini"
|
||||
assert config.api_key == "env-openai-key"
|
||||
assert config.base_url is None
|
||||
assert config.max_turns == 12
|
||||
|
||||
|
||||
def test_resolve_config_priority_cli_over_env_over_file(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
|
||||
(tmp_path / ".cc-slim.toml").write_text(
|
||||
"""
|
||||
[cc_slim]
|
||||
provider = "anthropic"
|
||||
model = "file-model"
|
||||
api_key = "file-key"
|
||||
base_url = "https://file.example"
|
||||
max_turns = 4
|
||||
""".strip(),
|
||||
encoding="utf-8",
|
||||
)
|
||||
monkeypatch.setenv("CC_SLIM_PROVIDER", "openai")
|
||||
monkeypatch.setenv("CC_SLIM_MODEL", "env-model")
|
||||
monkeypatch.setenv("CC_SLIM_API_KEY", "env-key")
|
||||
monkeypatch.setenv("CC_SLIM_BASE_URL", "https://env.example")
|
||||
monkeypatch.setenv("CC_SLIM_MAX_TURNS", "9")
|
||||
|
||||
config = resolve_config(
|
||||
tmp_path,
|
||||
{
|
||||
"provider": "anthropic",
|
||||
"model": "cli-model",
|
||||
"api_key": "cli-key",
|
||||
"base_url": "https://cli.example",
|
||||
"max_turns": 15,
|
||||
},
|
||||
)
|
||||
|
||||
assert config.provider == "anthropic"
|
||||
assert config.model == "cli-model"
|
||||
assert config.api_key == "cli-key"
|
||||
assert config.base_url == "https://cli.example"
|
||||
assert config.max_turns == 15
|
||||
|
||||
|
||||
def test_resolve_config_requires_api_key(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
|
||||
monkeypatch.delenv("CC_SLIM_API_KEY", raising=False)
|
||||
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
|
||||
monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False)
|
||||
|
||||
with pytest.raises(ValueError, match="缺少 API key"):
|
||||
resolve_config(tmp_path, {})
|
||||
57
tests/test_tools.py
Normal file
57
tests/test_tools.py
Normal file
@ -0,0 +1,57 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
|
||||
|
||||
from cc_slim.tools import (
|
||||
_safe_path,
|
||||
bash_tool,
|
||||
edit_tool,
|
||||
glob_tool,
|
||||
grep_tool,
|
||||
read_tool,
|
||||
write_tool,
|
||||
)
|
||||
|
||||
|
||||
def test_safe_path_allows_workspace_children(tmp_path: Path) -> None:
|
||||
path = _safe_path(tmp_path, "a/b.txt")
|
||||
|
||||
assert path == (tmp_path / "a/b.txt").resolve()
|
||||
|
||||
|
||||
def test_safe_path_blocks_escape(tmp_path: Path) -> None:
|
||||
with pytest.raises(ValueError, match="路径越过工作区边界"):
|
||||
_safe_path(tmp_path, "../outside.txt")
|
||||
|
||||
|
||||
def test_write_edit_read_glob_grep_flow(tmp_path: Path) -> None:
|
||||
created = write_tool(tmp_path, {"path": "hello.py", "content": "print('hello')\n"})
|
||||
edited = edit_tool(tmp_path, {"path": "hello.py", "content": "print('world')\n"})
|
||||
read_back = read_tool(tmp_path, {"path": "hello.py"})
|
||||
globbed = glob_tool(tmp_path, {"pattern": "*.py"})
|
||||
grepped = grep_tool(tmp_path, {"pattern": "world", "path": "."})
|
||||
|
||||
assert created == "已创建文件: hello.py"
|
||||
assert edited == "已修改文件: hello.py"
|
||||
assert "1: print('world')" in read_back
|
||||
assert globbed == "hello.py"
|
||||
assert "hello.py:1: print('world')" in grepped
|
||||
|
||||
|
||||
def test_bash_tool_success(tmp_path: Path) -> None:
|
||||
result = bash_tool(tmp_path, {"command": "cmd /c exit 0"})
|
||||
payload = json.loads(result)
|
||||
|
||||
assert payload["returncode"] == 0
|
||||
|
||||
|
||||
def test_edit_requires_existing_file(tmp_path: Path) -> None:
|
||||
result = edit_tool(tmp_path, {"path": "missing.txt", "content": "x"})
|
||||
|
||||
assert result == "文件不存在: missing.txt"
|
||||
Loading…
Reference in New Issue
Block a user