Compare commits

...

8 Commits

11 changed files with 659 additions and 20 deletions

2
.gitignore vendored
View File

@ -14,7 +14,7 @@ venv/
*.log *.log
logs/ logs/
.tmp/ .tmp/
test/
# IDE # IDE
.vscode/ .vscode/
.idea/ .idea/

View File

@ -21,6 +21,16 @@
4. 使用工具后重新判断结果。 4. 使用工具后重新判断结果。
5. 持续循环,直到得到最终答案或出现必须由用户补充的信息。 5. 持续循环,直到得到最终答案或出现必须由用户补充的信息。
当前 harness 是极简实现,优先最小动作,不做不必要的重复试错。
行动时必须以运行时注入的环境信息为准特别是平台、shell、工作目录和可用工具列表。
REPL 内的会话管理命令优先使用 slash command。
`/history`、`/resume`、`/new` 由程序直接处理,不进入 agent loop。
当前项目支持最小 memory使用 `/remember` 保存长期信息,使用 `/memory` 查看。
未明确说明时,使用以下默认值: 未明确说明时,使用以下默认值:
- 工作目录:当前进程目录 - 工作目录:当前进程目录
@ -33,8 +43,18 @@
## 工具使用规则 ## 工具使用规则
- `Read`:用于读取文件内容。 - `Read`:用于读取文件内容。
- `Glob`:用于按模式查找文件。 - 需要按文件名或路径模式查找时,优先使用 `Glob`
- 需要搜索文件内容时,优先使用 `Grep`
- 修改已有文件内容时,优先使用 `Edit` 工具。
- 创建新文件时,优先使用 `Write` 工具。
- `Bash`:用于执行必须通过 shell 完成的最小命令。 - `Bash`:用于执行必须通过 shell 完成的最小命令。
- 只有在确实需要复杂 shell 特性时才使用 Bash。
- 不要用 Bash 拼接文件内容。
- Windows 环境下优先使用兼容写法,不默认使用 `cat <<EOF`、`ls -la` 等 Unix 风格写法。
- 同一写入或创建目标,最多尝试 2 种不同 Bash 方案。
- 如果两次 Bash 方案失败,应立即收敛:
- 先检查路径、文件状态、shell 兼容性。
- 如仍不确定,则询问用户,而不是继续盲试。
- 不能虚构工具输出。 - 不能虚构工具输出。
- 不能在未验证前声称文件存在、命令成功或修改已生效。 - 不能在未验证前声称文件存在、命令成功或修改已生效。
@ -42,5 +62,5 @@
- 对不确定性保持诚实。 - 对不确定性保持诚实。
- 需要时引用具体文件或命令。 - 需要时引用具体文件或命令。
- 默认保持简短,除非用户要求展开 - 输出保持简短直接,不夸大成功状态
- 若受阻,只询问当前缺失的关键信息。 - 若受阻,只询问当前缺失的关键信息。

View File

@ -20,6 +20,12 @@
- 重构时: - 重构时:
- 先找全部用法 - 先找全部用法
- 避免修改无关文件 - 避免修改无关文件
- 搜索文件内容时,优先使用 `Grep`,而不是 `Bash`
- 修改已有文件时,优先使用 `Edit`,而不是 `Bash``Write`
- 需要创建或覆盖文件时,优先使用 `Write` 工具,而不是 `Bash`
- 会话管理优先使用 slash command而不是自然语言或 `Bash` 探查 session 文件
- 长期有价值的项目约束或偏好,优先使用 `/remember` 保存
- Windows 下先验证 shell 兼容性,再选择命令写法
## 沟通风格 ## 沟通风格

View File

@ -24,10 +24,13 @@ packages = ["src/cc_slim"]
[tool.cc_slim] [tool.cc_slim]
provider = "openai" provider = "openai"
model = "gpt-4.1-mini" model = "gpt-4.1-mini"
max_turns = 8 max_turns = 12
[dependency-groups] [dependency-groups]
dev = [ dev = [
"pytest>=9.0.3", "pytest>=9.0.3",
"ruff>=0.15.10", "ruff>=0.15.10",
] ]
[tool.pytest.ini_options]
testpaths = ["tests"]

View File

@ -2,14 +2,19 @@ from __future__ import annotations
import json import json
import os import os
import platform
import sys
import tomllib import tomllib
from dataclasses import dataclass from dataclasses import dataclass
from json import JSONDecodeError
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any, Iterator
from anthropic import Anthropic from anthropic import Anthropic
from openai import OpenAI from openai import OpenAI
from cc_slim.memory import MemoryStore
from cc_slim.session import SessionStore
from cc_slim.tools import Tool from cc_slim.tools import Tool
@ -19,7 +24,7 @@ class Config:
model: str model: str
api_key: str api_key: str
base_url: str | None base_url: str | None
max_turns: int = 8 max_turns: int = 12
def resolve_config(workspace: Path, cli: dict[str, Any]) -> Config: def resolve_config(workspace: Path, cli: dict[str, Any]) -> Config:
@ -39,7 +44,7 @@ def resolve_config(workspace: Path, cli: dict[str, Any]) -> Config:
"", "",
) )
base_url = _pick(cli.get("base_url"), os.getenv("CC_SLIM_BASE_URL"), file_cfg.get("base_url"), None) base_url = _pick(cli.get("base_url"), os.getenv("CC_SLIM_BASE_URL"), file_cfg.get("base_url"), None)
max_turns_raw = _pick(cli.get("max_turns"), os.getenv("CC_SLIM_MAX_TURNS"), file_cfg.get("max_turns"), 8) max_turns_raw = _pick(cli.get("max_turns"), os.getenv("CC_SLIM_MAX_TURNS"), file_cfg.get("max_turns"), 12)
if not api_key: if not api_key:
raise ValueError("缺少 API key请通过 CLI、环境变量或 .cc-slim.toml 提供。") raise ValueError("缺少 API key请通过 CLI、环境变量或 .cc-slim.toml 提供。")
@ -54,25 +59,58 @@ def resolve_config(workspace: Path, cli: dict[str, Any]) -> Config:
class Agent: class Agent:
def __init__(self, config: Config, tools: list[Tool], workspace: Path) -> None: def __init__(
self,
config: Config,
tools: list[Tool],
workspace: Path,
session_store: SessionStore | None = None,
session_id: str | None = None,
history: list[dict[str, Any]] | None = None,
) -> None:
self.config = config self.config = config
self.tools = {tool.name: tool for tool in tools} self.tools = {tool.name: tool for tool in tools}
self.history: list[dict[str, Any]] = [] self.history: list[dict[str, Any]] = list(history or [])
self.session_store = session_store
self.session_id = session_id
self.system_prompt = self._build_system_prompt(workspace) self.system_prompt = self._build_system_prompt(workspace)
self.client = self._build_client() self.client = self._build_client()
def reply(self, user_input: str) -> str: def reply(self, user_input: str) -> str:
self.history.append({"role": "user", "content": user_input}) parts: list[str] = []
for event in self.stream_reply(user_input):
if event["type"] == "text":
parts.append(event["content"])
elif event["type"] == "error":
raise RuntimeError(event["message"])
return "".join(parts).strip() or "(empty response)"
def stream_reply(self, user_input: str) -> Iterator[dict[str, Any]]:
user_message = {"role": "user", "content": user_input}
self.history.append(user_message)
self._save_message(user_message)
for _ in range(self.config.max_turns): for _ in range(self.config.max_turns):
result = self._call_model() try:
if self.config.provider == "openai":
result = yield from self._stream_openai()
else:
result = yield from self._stream_anthropic()
except Exception as exc:
yield {"type": "error", "message": str(exc)}
return
self.history.append(result["assistant"]) self.history.append(result["assistant"])
self._save_message(result["assistant"])
if not result["tool_calls"]: if not result["tool_calls"]:
return result["text"].strip() or "(empty response)" yield {"type": "done"}
return
for call in result["tool_calls"]: for call in result["tool_calls"]:
yield {"type": "tool_call", "name": call["name"], "input": call["input"]}
tool_output = self._run_tool(call["name"], call["input"]) tool_output = self._run_tool(call["name"], call["input"])
yield {"type": "tool_result", "name": call["name"], "output": tool_output}
self.history.append( self.history.append(
{ {
"role": "tool", "role": "tool",
@ -81,8 +119,9 @@ class Agent:
"content": tool_output, "content": tool_output,
} }
) )
self._save_message(self.history[-1])
return "已达到最大工具循环轮数,停止执行。" yield {"type": "error", "message": "已达到最大工具循环轮数,停止执行。"}
def _build_client(self) -> Any: def _build_client(self) -> Any:
if self.config.provider == "openai": if self.config.provider == "openai":
@ -99,6 +138,8 @@ class Agent:
def _build_system_prompt(self, workspace: Path) -> str: def _build_system_prompt(self, workspace: Path) -> str:
parts: list[str] = [] parts: list[str] = []
parts.append(self._build_runtime_summary(workspace))
agents = workspace / "AGENTS.md" agents = workspace / "AGENTS.md"
if agents.exists(): if agents.exists():
parts.append(agents.read_text(encoding="utf-8")) parts.append(agents.read_text(encoding="utf-8"))
@ -108,12 +149,31 @@ class Agent:
for path in sorted(skills_dir.glob("*.md"), key=lambda p: p.name): for path in sorted(skills_dir.glob("*.md"), key=lambda p: p.name):
parts.append(path.read_text(encoding="utf-8")) parts.append(path.read_text(encoding="utf-8"))
memory = MemoryStore(workspace).read()
if memory:
parts.append(f"# Memory\n\n{memory}")
return "\n\n".join(part.strip() for part in parts if part.strip()) return "\n\n".join(part.strip() for part in parts if part.strip())
def _call_model(self) -> dict[str, Any]: def _build_runtime_summary(self, workspace: Path) -> str:
if self.config.provider == "openai": tool_names = ", ".join(self.tools.keys()) or "(none)"
return self._call_openai() shell_name = self._detect_shell()
return self._call_anthropic() return "\n".join(
[
"## 运行环境",
f"- 平台: {platform.system() or 'Unknown'}",
f"- sys.platform: {sys.platform}",
f"- shell: {shell_name}",
f"- workspace: {workspace}",
f"- 可用工具: {tool_names}",
"- 行动时必须以以上运行环境信息为准,不要默认套用 Unix/Linux 命令习惯。",
]
)
def _detect_shell(self) -> str:
if os.name == "nt":
return os.getenv("COMSPEC", "Windows shell (likely PowerShell or cmd.exe)")
return os.getenv("SHELL", "unknown shell")
def _call_openai(self) -> dict[str, Any]: def _call_openai(self) -> dict[str, Any]:
response = self.client.chat.completions.create( response = self.client.chat.completions.create(
@ -160,6 +220,51 @@ class Agent:
assistant = {"role": "assistant", "content": content_blocks, "tool_calls": tool_calls} assistant = {"role": "assistant", "content": content_blocks, "tool_calls": tool_calls}
return {"assistant": assistant, "tool_calls": tool_calls, "text": "\n".join(text_parts)} return {"assistant": assistant, "tool_calls": tool_calls, "text": "\n".join(text_parts)}
def _stream_openai(self) -> Iterator[dict[str, Any]]:
stream = self.client.chat.completions.create(
model=self.config.model,
messages=self._openai_messages(),
tools=self._openai_tools(),
tool_choice="auto",
stream=True,
)
text_parts: list[str] = []
tool_call_parts: dict[int, dict[str, Any]] = {}
for chunk in stream:
choice = chunk.choices[0] if chunk.choices else None
if choice is None:
continue
delta = choice.delta
if delta.content:
text_parts.append(delta.content)
yield {"type": "text", "content": delta.content}
for tool_delta in delta.tool_calls or []:
slot = tool_call_parts.setdefault(
tool_delta.index,
{"id": "", "name": "", "arguments": ""},
)
if tool_delta.id:
slot["id"] = tool_delta.id
if tool_delta.function and tool_delta.function.name:
slot["name"] = tool_delta.function.name
if tool_delta.function and tool_delta.function.arguments:
slot["arguments"] += tool_delta.function.arguments
tool_calls = [self._finalize_openai_tool_call(part) for _, part in sorted(tool_call_parts.items())]
text = "".join(text_parts)
assistant = {"role": "assistant", "content": text, "tool_calls": tool_calls}
return {"assistant": assistant, "tool_calls": tool_calls, "text": text}
def _stream_anthropic(self) -> Iterator[dict[str, Any]]:
result = self._call_anthropic()
if result["text"]:
yield {"type": "text", "content": result["text"]}
return result
def _run_tool(self, name: str, payload: dict[str, Any]) -> str: def _run_tool(self, name: str, payload: dict[str, Any]) -> str:
tool = self.tools.get(name) tool = self.tools.get(name)
if not tool: if not tool:
@ -248,6 +353,22 @@ class Agent:
) )
return messages return messages
def _finalize_openai_tool_call(self, part: dict[str, Any]) -> dict[str, Any]:
arguments = part.get("arguments", "") or "{}"
try:
payload = json.loads(arguments)
except JSONDecodeError as exc:
raise ValueError(f"工具参数解析失败: {exc}") from exc
return {
"id": part.get("id") or f"call_{part.get('name', 'tool')}",
"name": part.get("name") or "",
"input": payload,
}
def _save_message(self, message: dict[str, Any]) -> None:
if self.session_store and self.session_id:
self.session_store.append_message(self.session_id, message)
def _load_file_config(path: Path) -> dict[str, Any]: def _load_file_config(path: Path) -> dict[str, Any]:
if not path.exists(): if not path.exists():

View File

@ -5,14 +5,126 @@ from typing import Optional
import typer import typer
from rich.console import Console from rich.console import Console
from rich.table import Table
from cc_slim.engine import Agent, resolve_config from cc_slim.engine import Agent, resolve_config
from cc_slim.memory import MemoryStore
from cc_slim.session import SessionStore
from cc_slim.tools import build_default_tools from cc_slim.tools import build_default_tools
app = typer.Typer(add_completion=False, no_args_is_help=False) app = typer.Typer(add_completion=False, no_args_is_help=False)
console = Console() console = Console()
def render_stream(agent: Agent, user_input: str) -> None:
printed_text = False
for event in agent.stream_reply(user_input):
if event["type"] == "text":
console.print(event["content"], end="")
printed_text = True
elif event["type"] == "tool_call":
if printed_text:
console.print()
printed_text = False
console.print(f"[cyan]->[/cyan] {event['name']}({event['input']})")
elif event["type"] == "tool_result":
console.print(f"[green]✓[/green] {event['name']} done")
elif event["type"] == "error":
if printed_text:
console.print()
console.print(f"[red]error:[/red] {event['message']}")
return
elif event["type"] == "done":
console.print()
return
def render_history(store: SessionStore) -> None:
sessions = store.list_sessions()
if not sessions:
console.print("当前工作目录没有历史 session。")
return
table = Table(show_header=True)
table.add_column("#")
table.add_column("session_id")
table.add_column("title")
table.add_column("updated_at")
table.add_column("count")
for index, item in enumerate(sessions, start=1):
table.add_row(
str(index),
str(item.get("session_id", "")),
str(item.get("title", "")),
str(item.get("updated_at", "")),
str(item.get("message_count", 0)),
)
console.print(table)
def build_agent(
root: Path,
config: object,
store: SessionStore,
session_meta: dict[str, object],
restored_history: list[dict[str, object]] | None = None,
) -> Agent:
return Agent(
config=config,
tools=build_default_tools(root),
workspace=root,
session_store=store,
session_id=str(session_meta["session_id"]),
history=restored_history or [],
)
def handle_repl_command(
user_input: str,
store: SessionStore,
memory: MemoryStore,
config: object,
root: Path,
agent: Agent,
) -> Agent:
command = user_input.strip()
if command in {"--history", "/history"}:
render_history(store)
return agent
if command == "/new":
session_meta = store.create_session(config.model)
console.print(f"已创建新 session: {session_meta.get('session_id')}")
return build_agent(root, config, store, session_meta, [])
if command == "/memory":
content = memory.read()
console.print(content or "当前项目还没有 memory。")
return agent
if command.startswith("/remember "):
text = user_input.split(" ", 1)[1].strip()
if not text:
console.print("[red]error:[/red] 缺少需要保存的 memory 内容")
return agent
path = memory.append(text)
console.print(f"已写入 memory: {path}")
return agent
if command.startswith("--resume ") or command.startswith("/resume "):
target = command.split(maxsplit=1)[1].strip()
if not target:
console.print("[red]error:[/red] 缺少 resume 目标")
return agent
session_id = store.resolve_session_id(target)
session_meta = store.load_meta(session_id)
restored_history = store.load_messages(session_id)
console.print(f"已恢复 session: {session_meta.get('session_id')}")
return build_agent(root, config, store, session_meta, restored_history)
return agent
@app.command() @app.command()
def run( def run(
prompt: Optional[str] = typer.Argument(None, help="单次执行的用户输入"), prompt: Optional[str] = typer.Argument(None, help="单次执行的用户输入"),
@ -22,8 +134,16 @@ def run(
base_url: Optional[str] = typer.Option(None, help="可选的 API Base URL"), base_url: Optional[str] = typer.Option(None, help="可选的 API Base URL"),
cwd: Path = typer.Option(Path("."), help="工作区根目录"), cwd: Path = typer.Option(Path("."), help="工作区根目录"),
max_turns: Optional[int] = typer.Option(None, help="最大工具循环轮数"), max_turns: Optional[int] = typer.Option(None, help="最大工具循环轮数"),
history: bool = typer.Option(False, "--history", help="列出当前工作目录的历史 session"),
resume: Optional[str] = typer.Option(None, "--resume", help="按 session id 或序号恢复历史 session"),
) -> None: ) -> None:
root = cwd.resolve() root = cwd.resolve()
store = SessionStore(root)
memory = MemoryStore(root)
if history:
render_history(store)
return
config = resolve_config( config = resolve_config(
root, root,
{ {
@ -34,10 +154,20 @@ def run(
"max_turns": max_turns, "max_turns": max_turns,
}, },
) )
agent = Agent(config=config, tools=build_default_tools(root), workspace=root) session_meta: dict[str, object]
restored_history: list[dict[str, object]] = []
if resume:
session_id = store.resolve_session_id(resume)
session_meta = store.load_meta(session_id)
restored_history = store.load_messages(session_id)
console.print(f"已恢复 session: {session_meta.get('session_id')}")
else:
session_meta = store.create_session(config.model)
agent = build_agent(root, config, store, session_meta, restored_history)
if prompt: if prompt:
console.print(agent.reply(prompt)) render_stream(agent, prompt)
return return
console.print("[bold cyan]cc-slim[/bold cyan] REPL输入 exit 或 quit 退出。") console.print("[bold cyan]cc-slim[/bold cyan] REPL输入 exit 或 quit 退出。")
@ -53,8 +183,15 @@ def run(
if not user_input.strip(): if not user_input.strip():
continue continue
if user_input.strip().startswith("/") or user_input.strip().startswith("--history") or user_input.strip().startswith("--resume"):
try:
agent = handle_repl_command(user_input, store, memory, config, root, agent)
except Exception as exc: # pragma: no cover
console.print(f"[red]error:[/red] {exc}")
continue
try: try:
console.print(agent.reply(user_input)) render_stream(agent, user_input)
except Exception as exc: # pragma: no cover except Exception as exc: # pragma: no cover
console.print(f"[red]error:[/red] {exc}") console.print(f"[red]error:[/red] {exc}")

33
src/cc_slim/memory.py Normal file
View File

@ -0,0 +1,33 @@
from __future__ import annotations
import hashlib
import re
from pathlib import Path
class MemoryStore:
def __init__(self, cwd: Path) -> None:
self.cwd = cwd.resolve()
self.root = Path.home() / ".config" / "cc-slim" / "memory" / self._sanitize_cwd(self.cwd)
self.root.mkdir(parents=True, exist_ok=True)
def append(self, text: str) -> Path:
path = self.path()
existing = self.read()
content = f"{existing}\n\n{text.strip()}" if existing else text.strip()
path.write_text(content + "\n", encoding="utf-8")
return path
def read(self) -> str:
path = self.path()
if not path.exists():
return ""
return path.read_text(encoding="utf-8").strip()
def path(self) -> Path:
return self.root / "MEMORY.md"
def _sanitize_cwd(self, cwd: Path) -> str:
text = re.sub(r"[^A-Za-z0-9._-]+", "_", str(cwd))
digest = hashlib.sha1(str(cwd).encode("utf-8")).hexdigest()[:8]
return f"{text[:48]}-{digest}"

95
src/cc_slim/session.py Normal file
View File

@ -0,0 +1,95 @@
from __future__ import annotations
import hashlib
import json
import re
from datetime import datetime
from pathlib import Path
from typing import Any
class SessionStore:
def __init__(self, cwd: Path) -> None:
self.cwd = cwd.resolve()
self.root = Path.home() / ".config" / "cc-slim" / "sessions" / self._sanitize_cwd(self.cwd)
self.root.mkdir(parents=True, exist_ok=True)
def create_session(self, model: str) -> dict[str, Any]:
now = datetime.now().strftime("%Y%m%d-%H%M%S")
cwd_hash = hashlib.sha1(str(self.cwd).encode("utf-8")).hexdigest()[:8]
session_id = f"{now}-{cwd_hash}"
meta = {
"session_id": session_id,
"title": "",
"cwd": str(self.cwd),
"model": model,
"created_at": datetime.now().isoformat(timespec="seconds"),
"updated_at": datetime.now().isoformat(timespec="seconds"),
"message_count": 0,
}
self._write_meta(session_id, meta)
self._messages_path(session_id).write_text("", encoding="utf-8")
return meta
def append_message(self, session_id: str, message: dict[str, Any]) -> None:
with self._messages_path(session_id).open("a", encoding="utf-8") as fh:
fh.write(json.dumps(message, ensure_ascii=False) + "\n")
meta = self.load_meta(session_id)
meta["message_count"] = int(meta.get("message_count", 0)) + 1
meta["updated_at"] = datetime.now().isoformat(timespec="seconds")
if not meta.get("title") and message.get("role") == "user":
meta["title"] = self._make_title(str(message.get("content", "")))
self._write_meta(session_id, meta)
def list_sessions(self) -> list[dict[str, Any]]:
sessions: list[dict[str, Any]] = []
for path in self.root.glob("*.meta.json"):
sessions.append(json.loads(path.read_text(encoding="utf-8")))
sessions.sort(key=lambda item: item.get("updated_at", ""), reverse=True)
return sessions
def load_messages(self, session_id: str) -> list[dict[str, Any]]:
path = self._messages_path(session_id)
if not path.exists():
return []
messages: list[dict[str, Any]] = []
for line in path.read_text(encoding="utf-8").splitlines():
if line.strip():
messages.append(json.loads(line))
return messages
def load_meta(self, session_id: str) -> dict[str, Any]:
return json.loads(self._meta_path(session_id).read_text(encoding="utf-8"))
def resolve_session_id(self, value: str) -> str:
sessions = self.list_sessions()
if value.isdigit():
index = int(value) - 1
if 0 <= index < len(sessions):
return str(sessions[index]["session_id"])
raise ValueError(f"session index 不存在: {value}")
for item in sessions:
session_id = str(item.get("session_id", ""))
if session_id == value or session_id.startswith(value):
return session_id
raise ValueError(f"session 不存在: {value}")
def _messages_path(self, session_id: str) -> Path:
return self.root / f"{session_id}.jsonl"
def _meta_path(self, session_id: str) -> Path:
return self.root / f"{session_id}.meta.json"
def _write_meta(self, session_id: str, meta: dict[str, Any]) -> None:
self._meta_path(session_id).write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
def _sanitize_cwd(self, cwd: Path) -> str:
text = re.sub(r"[^A-Za-z0-9._-]+", "_", str(cwd))
digest = hashlib.sha1(str(cwd).encode("utf-8")).hexdigest()[:8]
return f"{text[:48]}-{digest}"
def _make_title(self, text: str) -> str:
compact = " ".join(text.strip().split()) or "新会话"
return compact[:48]

View File

@ -43,6 +43,50 @@ def build_default_tools(workspace: Path) -> list[Tool]:
}, },
execute=lambda data: glob_tool(workspace, data), execute=lambda data: glob_tool(workspace, data),
), ),
Tool(
name="Edit",
description="覆盖已有文本文件的完整内容。",
input_schema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "相对工作区的文件路径"},
"content": {"type": "string", "description": "替换后的完整文件内容"},
"encoding": {"type": "string", "description": "文件编码,默认 utf-8"},
},
"required": ["path", "content"],
"additionalProperties": False,
},
execute=lambda data: edit_tool(workspace, data),
),
Tool(
name="Write",
description="创建或覆盖工作区内的文本文件。",
input_schema={
"type": "object",
"properties": {
"path": {"type": "string", "description": "相对工作区的文件路径"},
"content": {"type": "string", "description": "要写入的完整文本内容"},
"encoding": {"type": "string", "description": "文件编码,默认 utf-8"},
},
"required": ["path", "content"],
"additionalProperties": False,
},
execute=lambda data: write_tool(workspace, data),
),
Tool(
name="Grep",
description="在文件或目录中搜索普通文本内容。",
input_schema={
"type": "object",
"properties": {
"pattern": {"type": "string", "description": "要搜索的普通文本字符串"},
"path": {"type": "string", "description": "可选,仅支持文件路径或目录路径,默认当前工作区"},
},
"required": ["pattern"],
"additionalProperties": False,
},
execute=lambda data: grep_tool(workspace, data),
),
Tool( Tool(
name="Bash", name="Bash",
description="在工作区中执行一条 shell 命令。", description="在工作区中执行一条 shell 命令。",
@ -82,6 +126,60 @@ def glob_tool(workspace: Path, data: dict[str, Any]) -> str:
return "\n".join(matches[:200]) return "\n".join(matches[:200])
def edit_tool(workspace: Path, data: dict[str, Any]) -> str:
path = _safe_path(workspace, str(data["path"]))
if not path.exists():
return f"文件不存在: {path.relative_to(workspace)}"
if path.is_dir():
return f"目标是目录,不能编辑文件: {path.relative_to(workspace)}"
encoding = str(data.get("encoding") or "utf-8")
path.write_text(str(data["content"]), encoding=encoding)
return f"已修改文件: {path.relative_to(workspace)}"
def write_tool(workspace: Path, data: dict[str, Any]) -> str:
path = _safe_path(workspace, str(data["path"]))
if path.exists() and path.is_dir():
return f"目标是目录,不能写入文件: {path.relative_to(workspace)}"
if not path.parent.exists():
return f"父目录不存在: {path.parent.relative_to(workspace)}"
existed = path.exists()
encoding = str(data.get("encoding") or "utf-8")
path.write_text(str(data["content"]), encoding=encoding)
action = "已覆盖" if existed else "已创建"
return f"{action}文件: {path.relative_to(workspace)}"
def grep_tool(workspace: Path, data: dict[str, Any]) -> str:
pattern = str(data["pattern"])
target = _safe_path(workspace, str(data.get("path") or "."))
if not target.exists():
return f"文件不存在: {target.relative_to(workspace)}"
results: list[str] = []
files = [target] if target.is_file() else [path for path in target.rglob("*") if path.is_file()]
if target.is_dir() and not files:
return f"目录为空: {target.relative_to(workspace)}"
for file_path in files:
try:
text = file_path.read_text(encoding="utf-8", errors="replace")
except OSError:
continue
for line_no, line in enumerate(text.splitlines(), start=1):
if pattern in line:
results.append(f"{file_path.relative_to(workspace)}:{line_no}: {line}")
if len(results) >= 20:
return "\n".join(results)
if not results:
return "未找到匹配内容"
return "\n".join(results)
def bash_tool(workspace: Path, data: dict[str, Any]) -> str: def bash_tool(workspace: Path, data: dict[str, Any]) -> str:
command = str(data["command"]) command = str(data["command"])
proc = subprocess.run( proc = subprocess.run(
@ -90,6 +188,8 @@ def bash_tool(workspace: Path, data: dict[str, Any]) -> str:
shell=True, shell=True,
capture_output=True, capture_output=True,
text=True, text=True,
encoding="utf-8",
errors="replace",
timeout=60, timeout=60,
) )
payload = { payload = {

67
tests/test_config.py Normal file
View File

@ -0,0 +1,67 @@
from __future__ import annotations
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
from cc_slim.engine import resolve_config
def test_resolve_config_uses_defaults(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
monkeypatch.setenv("OPENAI_API_KEY", "env-openai-key")
config = resolve_config(tmp_path, {})
assert config.provider == "openai"
assert config.model == "gpt-4.1-mini"
assert config.api_key == "env-openai-key"
assert config.base_url is None
assert config.max_turns == 12
def test_resolve_config_priority_cli_over_env_over_file(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
(tmp_path / ".cc-slim.toml").write_text(
"""
[cc_slim]
provider = "anthropic"
model = "file-model"
api_key = "file-key"
base_url = "https://file.example"
max_turns = 4
""".strip(),
encoding="utf-8",
)
monkeypatch.setenv("CC_SLIM_PROVIDER", "openai")
monkeypatch.setenv("CC_SLIM_MODEL", "env-model")
monkeypatch.setenv("CC_SLIM_API_KEY", "env-key")
monkeypatch.setenv("CC_SLIM_BASE_URL", "https://env.example")
monkeypatch.setenv("CC_SLIM_MAX_TURNS", "9")
config = resolve_config(
tmp_path,
{
"provider": "anthropic",
"model": "cli-model",
"api_key": "cli-key",
"base_url": "https://cli.example",
"max_turns": 15,
},
)
assert config.provider == "anthropic"
assert config.model == "cli-model"
assert config.api_key == "cli-key"
assert config.base_url == "https://cli.example"
assert config.max_turns == 15
def test_resolve_config_requires_api_key(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
monkeypatch.delenv("CC_SLIM_API_KEY", raising=False)
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False)
with pytest.raises(ValueError, match="缺少 API key"):
resolve_config(tmp_path, {})

57
tests/test_tools.py Normal file
View File

@ -0,0 +1,57 @@
from __future__ import annotations
import json
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
from cc_slim.tools import (
_safe_path,
bash_tool,
edit_tool,
glob_tool,
grep_tool,
read_tool,
write_tool,
)
def test_safe_path_allows_workspace_children(tmp_path: Path) -> None:
path = _safe_path(tmp_path, "a/b.txt")
assert path == (tmp_path / "a/b.txt").resolve()
def test_safe_path_blocks_escape(tmp_path: Path) -> None:
with pytest.raises(ValueError, match="路径越过工作区边界"):
_safe_path(tmp_path, "../outside.txt")
def test_write_edit_read_glob_grep_flow(tmp_path: Path) -> None:
created = write_tool(tmp_path, {"path": "hello.py", "content": "print('hello')\n"})
edited = edit_tool(tmp_path, {"path": "hello.py", "content": "print('world')\n"})
read_back = read_tool(tmp_path, {"path": "hello.py"})
globbed = glob_tool(tmp_path, {"pattern": "*.py"})
grepped = grep_tool(tmp_path, {"pattern": "world", "path": "."})
assert created == "已创建文件: hello.py"
assert edited == "已修改文件: hello.py"
assert "1: print('world')" in read_back
assert globbed == "hello.py"
assert "hello.py:1: print('world')" in grepped
def test_bash_tool_success(tmp_path: Path) -> None:
result = bash_tool(tmp_path, {"command": "cmd /c exit 0"})
payload = json.loads(result)
assert payload["returncode"] == 0
def test_edit_requires_existing_file(tmp_path: Path) -> None:
result = edit_tool(tmp_path, {"path": "missing.txt", "content": "x"})
assert result == "文件不存在: missing.txt"