refactor: restructure to official AgentSkills/Claude Code skill format

- Flatten colleague-creator/ to repo root (repo = skill directory)
- Update SKILL.md frontmatter with official fields: name, description,
  argument-hint, version, user-invocable, allowed-tools
- Move PRD.md → docs/PRD.md
- Add .gitignore, requirements.txt, LICENSE
- Update README and INSTALL docs to reflect new structure and git clone install

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
titanwings
2026-03-30 13:37:54 +08:00
parent 4f33f68426
commit 6a0b31aa6c
23 changed files with 123 additions and 190 deletions

View File

@@ -0,0 +1,787 @@
#!/usr/bin/env python3
"""
钉钉自动采集器
输入同事姓名,自动:
1. 搜索钉钉用户,获取 userId
2. 搜索他创建/编辑的文档和知识库内容
3. 拉取多维表格(如有)
4. 消息记录API 不支持历史拉取,自动切换浏览器方案)
5. 输出统一格式,直接进 colleague-creator 分析流程
钉钉限制说明:
钉钉 Open API 不提供历史消息拉取接口,
消息记录部分自动使用 Playwright 浏览器方案采集。
前置:
pip3 install requests playwright
playwright install chromium
python3 dingtalk_auto_collector.py --setup
用法:
python3 dingtalk_auto_collector.py --name "张三" --output-dir ./knowledge/zhangsan
python3 dingtalk_auto_collector.py --name "张三" --skip-messages # 跳过消息采集
python3 dingtalk_auto_collector.py --name "张三" --doc-limit 20
"""
from __future__ import annotations
import json
import sys
import time
import argparse
import platform
from pathlib import Path
from datetime import datetime, timezone
from typing import Optional
try:
import requests
except ImportError:
print("错误请先安装依赖pip3 install requests", file=sys.stderr)
sys.exit(1)
CONFIG_PATH = Path.home() / ".colleague-skill" / "dingtalk_config.json"
API_BASE = "https://api.dingtalk.com"
# ─── 配置 ────────────────────────────────────────────────────────────────────
def load_config() -> dict:
if not CONFIG_PATH.exists():
print("未找到配置请先运行python3 dingtalk_auto_collector.py --setup", file=sys.stderr)
sys.exit(1)
return json.loads(CONFIG_PATH.read_text(encoding="utf-8"))
def save_config(config: dict) -> None:
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
CONFIG_PATH.write_text(json.dumps(config, indent=2, ensure_ascii=False))
def setup_config() -> None:
print("=== 钉钉自动采集配置 ===\n")
print("请前往 https://open-dev.dingtalk.com 创建企业内部应用,开通以下权限:\n")
print(" 通讯录类:")
print(" qyapi_get_member_detail 查询用户详情")
print(" Contact.User.mobile 读取用户手机号(可选)")
print()
print(" 消息类(可选,仅用于发消息,历史消息需浏览器方案):")
print(" qyapi_robot_sendmsg 机器人发消息")
print()
print(" 文档类:")
print(" Doc.WorkSpace.READ 读取工作空间")
print(" Doc.File.READ 读取文件")
print()
print(" 多维表格:")
print(" Bitable.Record.READ 读取记录")
print()
app_key = input("AppKey (ding_xxx): ").strip()
app_secret = input("AppSecret: ").strip()
config = {"app_key": app_key, "app_secret": app_secret}
save_config(config)
print(f"\n✅ 配置已保存到 {CONFIG_PATH}")
print("\n注意:消息记录采集需要 Playwright请确认已安装")
print(" pip3 install playwright && playwright install chromium")
# ─── Token ───────────────────────────────────────────────────────────────────
_token_cache: dict = {}
def get_access_token(config: dict) -> str:
"""获取钉钉 access_token带缓存"""
now = time.time()
if _token_cache.get("token") and _token_cache.get("expire", 0) > now + 60:
return _token_cache["token"]
resp = requests.post(
f"{API_BASE}/v1.0/oauth2/accessToken",
json={"appKey": config["app_key"], "appSecret": config["app_secret"]},
timeout=10,
)
data = resp.json()
if "accessToken" not in data:
print(f"获取 token 失败:{data}", file=sys.stderr)
sys.exit(1)
token = data["accessToken"]
_token_cache["token"] = token
_token_cache["expire"] = now + data.get("expireIn", 7200)
return token
def api_get(path: str, params: dict, config: dict) -> dict:
token = get_access_token(config)
resp = requests.get(
f"{API_BASE}{path}",
params=params,
headers={"x-acs-dingtalk-access-token": token},
timeout=15,
)
return resp.json()
def api_post(path: str, body: dict, config: dict) -> dict:
token = get_access_token(config)
resp = requests.post(
f"{API_BASE}{path}",
json=body,
headers={"x-acs-dingtalk-access-token": token},
timeout=15,
)
return resp.json()
# ─── 用户搜索 ─────────────────────────────────────────────────────────────────
def find_user(name: str, config: dict) -> Optional[dict]:
"""通过姓名搜索钉钉用户"""
print(f" 搜索用户:{name} ...", file=sys.stderr)
data = api_post(
"/v1.0/contact/users/search",
{"searchText": name, "offset": 0, "size": 10},
config,
)
users = data.get("list", []) or data.get("result", {}).get("list", [])
if not users:
# 降级:通过部门遍历搜索
print(" API 搜索无结果,尝试遍历通讯录 ...", file=sys.stderr)
users = search_users_by_dept(name, config)
if not users:
print(f" 未找到用户:{name}", file=sys.stderr)
return None
if len(users) == 1:
u = users[0]
print(f" 找到用户:{u.get('name')}{u.get('deptNameList', [''])[0] if isinstance(u.get('deptNameList'), list) else ''}", file=sys.stderr)
return u
print(f"\n 找到 {len(users)} 个结果,请选择:")
for i, u in enumerate(users):
dept = u.get("deptNameList", [""])
dept_str = dept[0] if isinstance(dept, list) and dept else ""
print(f" [{i+1}] {u.get('name')} {dept_str} {u.get('unionId', '')}")
choice = input("\n 选择编号(默认 1").strip() or "1"
try:
return users[int(choice) - 1]
except (ValueError, IndexError):
return users[0]
def search_users_by_dept(name: str, config: dict, dept_id: int = 1, depth: int = 0) -> list:
"""递归遍历部门搜索用户(深度限制 3 层)"""
if depth > 3:
return []
results = []
# 获取部门用户列表
data = api_post(
"/v1.0/contact/users/simplelist",
{"deptId": dept_id, "cursor": 0, "size": 100},
config,
)
users = data.get("list", [])
for u in users:
if name in u.get("name", ""):
# 获取详细信息
detail = api_get(f"/v1.0/contact/users/{u.get('userId')}", {}, config)
results.append(detail.get("result", u))
# 获取子部门
sub_data = api_get(
"/v1.0/contact/departments/listSubDepts",
{"deptId": dept_id},
config,
)
for sub in sub_data.get("result", []):
results.extend(search_users_by_dept(name, config, sub.get("deptId"), depth + 1))
return results
# ─── 文档采集 ─────────────────────────────────────────────────────────────────
def list_workspaces(config: dict) -> list:
"""获取所有工作空间"""
data = api_get("/v1.0/doc/workspaces", {"maxResults": 50}, config)
return data.get("workspaceModels", []) or data.get("result", {}).get("workspaceModels", [])
def search_docs_by_user(user_id: str, name: str, doc_limit: int, config: dict) -> list:
"""搜索用户创建的文档"""
print(f" 搜索 {name} 的文档 ...", file=sys.stderr)
# 方式一:全局搜索
data = api_post(
"/v1.0/doc/search",
{
"keyword": name,
"size": doc_limit,
"offset": 0,
},
config,
)
docs = []
items = data.get("docList", []) or data.get("result", {}).get("docList", [])
for item in items:
creator_id = item.get("creatorId", "") or item.get("creator", {}).get("userId", "")
# 过滤:只保留目标用户创建的
if user_id and creator_id and creator_id != user_id:
continue
docs.append({
"title": item.get("title", "无标题"),
"docId": item.get("docId", ""),
"spaceId": item.get("spaceId", ""),
"type": item.get("docType", ""),
"url": item.get("shareUrl", ""),
"creator": item.get("creatorName", name),
})
if not docs:
# 方式二:遍历工作空间找文档
print(" 搜索无结果,遍历工作空间 ...", file=sys.stderr)
workspaces = list_workspaces(config)
for ws in workspaces[:5]: # 最多查 5 个空间
ws_id = ws.get("spaceId") or ws.get("workspaceId")
if not ws_id:
continue
files_data = api_get(
f"/v1.0/doc/workspaces/{ws_id}/files",
{"maxResults": 20, "orderBy": "modified_time", "order": "DESC"},
config,
)
for f in files_data.get("files", []):
creator_id = f.get("creatorId", "")
if user_id and creator_id and creator_id != user_id:
continue
docs.append({
"title": f.get("fileName", "无标题"),
"docId": f.get("docId", ""),
"spaceId": ws_id,
"type": f.get("docType", ""),
"url": f.get("shareUrl", ""),
"creator": name,
})
print(f" 找到 {len(docs)} 篇文档", file=sys.stderr)
return docs[:doc_limit]
def fetch_doc_content(doc_id: str, space_id: str, config: dict) -> str:
"""拉取单篇文档的文本内容"""
# 方式一:直接获取文档内容
data = api_get(
f"/v1.0/doc/workspaces/{space_id}/files/{doc_id}/content",
{},
config,
)
content = (
data.get("content")
or data.get("result", {}).get("content")
or data.get("markdown")
or data.get("result", {}).get("markdown")
or ""
)
if content:
return content
# 方式二:获取下载链接后下载
dl_data = api_get(
f"/v1.0/doc/workspaces/{space_id}/files/{doc_id}/download",
{},
config,
)
dl_url = dl_data.get("downloadUrl") or dl_data.get("result", {}).get("downloadUrl")
if dl_url:
try:
resp = requests.get(dl_url, timeout=15)
return resp.text
except Exception:
pass
return ""
def collect_docs(user: dict, doc_limit: int, config: dict) -> str:
"""采集目标用户的文档"""
user_id = user.get("userId", "")
name = user.get("name", "")
docs = search_docs_by_user(user_id, name, doc_limit, config)
if not docs:
return f"# 文档内容\n\n未找到 {name} 相关文档\n"
lines = [
"# 文档内容(钉钉自动采集)",
f"目标:{name}",
f"{len(docs)}",
"",
]
for doc in docs:
title = doc.get("title", "无标题")
doc_id = doc.get("docId", "")
space_id = doc.get("spaceId", "")
url = doc.get("url", "")
if not doc_id or not space_id:
continue
print(f" 拉取文档:{title} ...", file=sys.stderr)
content = fetch_doc_content(doc_id, space_id, config)
if not content or len(content.strip()) < 20:
print(f" 内容为空,跳过", file=sys.stderr)
continue
lines += [
"---",
f"## 《{title}",
f"链接:{url}",
f"创建人:{doc.get('creator', '')}",
"",
content.strip(),
"",
]
return "\n".join(lines)
# ─── 多维表格 ─────────────────────────────────────────────────────────────────
def search_bitables(user_id: str, name: str, config: dict) -> list:
"""搜索目标用户的多维表格"""
print(f" 搜索 {name} 的多维表格 ...", file=sys.stderr)
data = api_post(
"/v1.0/doc/search",
{"keyword": name, "size": 20, "offset": 0, "docTypes": ["bitable"]},
config,
)
tables = []
for item in data.get("docList", []):
if item.get("docType") != "bitable":
continue
creator_id = item.get("creatorId", "")
if user_id and creator_id and creator_id != user_id:
continue
tables.append(item)
print(f" 找到 {len(tables)} 个多维表格", file=sys.stderr)
return tables
def fetch_bitable_content(base_id: str, config: dict) -> str:
"""拉取多维表格内容"""
# 获取所有 sheet
sheets_data = api_get(
f"/v1.0/bitable/bases/{base_id}/sheets",
{},
config,
)
sheets = sheets_data.get("sheets", []) or sheets_data.get("result", {}).get("sheets", [])
if not sheets:
return "(多维表格为空或无权限)\n"
lines = []
for sheet in sheets:
sheet_id = sheet.get("sheetId") or sheet.get("id")
sheet_name = sheet.get("name", sheet_id)
# 获取字段
fields_data = api_get(
f"/v1.0/bitable/bases/{base_id}/sheets/{sheet_id}/fields",
{"maxResults": 100},
config,
)
fields = [f.get("name", "") for f in fields_data.get("fields", [])]
# 获取记录
records_data = api_get(
f"/v1.0/bitable/bases/{base_id}/sheets/{sheet_id}/records",
{"maxResults": 200},
config,
)
records = records_data.get("records", []) or records_data.get("result", {}).get("records", [])
lines.append(f"### 表:{sheet_name}")
lines.append("")
if fields:
lines.append("| " + " | ".join(fields) + " |")
lines.append("| " + " | ".join(["---"] * len(fields)) + " |")
for rec in records:
row_data = rec.get("fields", {})
row = []
for f in fields:
val = row_data.get(f, "")
if isinstance(val, list):
val = " ".join(
v.get("text", str(v)) if isinstance(v, dict) else str(v)
for v in val
)
row.append(str(val).replace("|", "").replace("\n", " "))
lines.append("| " + " | ".join(row) + " |")
lines.append("")
return "\n".join(lines)
def collect_bitables(user: dict, config: dict) -> str:
"""采集目标用户的多维表格"""
user_id = user.get("userId", "")
name = user.get("name", "")
tables = search_bitables(user_id, name, config)
if not tables:
return f"# 多维表格\n\n未找到 {name} 的多维表格\n"
lines = [
"# 多维表格(钉钉自动采集)",
f"目标:{name}",
f"{len(tables)}",
"",
]
for t in tables:
title = t.get("title", "无标题")
doc_id = t.get("docId", "")
print(f" 拉取多维表格:{title} ...", file=sys.stderr)
content = fetch_bitable_content(doc_id, config)
lines += [
"---",
f"## 《{title}",
"",
content,
]
return "\n".join(lines)
# ─── 消息记录(浏览器方案)────────────────────────────────────────────────────
def get_default_chrome_profile() -> str:
system = platform.system()
if system == "Darwin":
return str(Path.home() / "Library/Application Support/Google/Chrome/Default")
elif system == "Linux":
return str(Path.home() / ".config/google-chrome/Default")
elif system == "Windows":
import os
return str(Path(os.environ.get("LOCALAPPDATA", "")) / "Google/Chrome/User Data/Default")
return str(Path.home() / ".config/google-chrome/Default")
def collect_messages_browser(
name: str,
msg_limit: int,
chrome_profile: Optional[str],
headless: bool,
) -> str:
"""通过 Playwright 浏览器抓取钉钉网页版消息记录"""
try:
from playwright.sync_api import sync_playwright
except ImportError:
return (
"# 消息记录\n\n"
"⚠️ 未安装 Playwright无法采集消息记录。\n"
"请运行pip3 install playwright && playwright install chromium\n"
)
import re
profile = chrome_profile or get_default_chrome_profile()
print(f" 启动浏览器抓取钉钉消息({'无头' if headless else '有界面'}...", file=sys.stderr)
messages = []
with sync_playwright() as p:
try:
ctx = p.chromium.launch_persistent_context(
user_data_dir=profile,
headless=headless,
args=["--disable-blink-features=AutomationControlled"],
ignore_default_args=["--enable-automation"],
viewport={"width": 1280, "height": 900},
)
except Exception as e:
return f"# 消息记录\n\n⚠️ 无法启动浏览器:{e}\n"
page = ctx.new_page()
# 打开钉钉网页版
page.goto("https://im.dingtalk.com", wait_until="domcontentloaded", timeout=20000)
time.sleep(3)
# 检查登录状态
if "login" in page.url.lower() or page.query_selector(".login-wrap"):
if headless:
ctx.close()
return (
"# 消息记录\n\n"
"⚠️ 检测到未登录。请用 --show-browser 参数重新运行,在弹出窗口中登录钉钉。\n"
)
print(" 请在浏览器中登录钉钉,登录完成后按回车继续...", file=sys.stderr)
input()
# 搜索目标联系人的消息
try:
# 点击搜索框
search_selectors = [
'[placeholder*="搜索"]',
'.search-input',
'[data-testid="search"]',
'.im-search',
]
for sel in search_selectors:
el = page.query_selector(sel)
if el:
el.click()
time.sleep(0.5)
page.keyboard.type(name)
time.sleep(2)
break
# 点击第一个结果
result_selectors = [
'.search-result-item',
'.contact-item',
'.result-item',
]
for sel in result_selectors:
result = page.query_selector(sel)
if result:
result.click()
time.sleep(2)
break
except Exception as e:
print(f" 自动导航失败:{e}", file=sys.stderr)
if not headless:
print(f" 请手动打开与「{name}」的对话,然后按回车继续...", file=sys.stderr)
input()
# 向上滚动加载历史消息
print(" 加载历史消息 ...", file=sys.stderr)
for _ in range(15):
page.keyboard.press("Control+Home")
time.sleep(1)
page.evaluate("window.scrollTo(0, 0)")
time.sleep(0.8)
time.sleep(2)
# 提取消息
raw_messages = page.evaluate(f"""
() => {{
const target = "{name}";
const results = [];
const selectors = [
'.message-item-content-container',
'.im-message-item',
'[data-message-id]',
'.msg-wrap',
];
let items = [];
for (const sel of selectors) {{
items = document.querySelectorAll(sel);
if (items.length > 0) break;
}}
items.forEach(item => {{
const senderEl = item.querySelector('.sender-name, .nick-name, .name');
const contentEl = item.querySelector(
'.message-text, .text-content, .msg-content, .im-richtext'
);
const timeEl = item.querySelector('.message-time, .time, .msg-time');
const sender = senderEl ? senderEl.innerText.trim() : '';
const content = contentEl ? contentEl.innerText.trim() : '';
const time = timeEl ? timeEl.innerText.trim() : '';
if (!content) return;
if (target && !sender.includes(target)) return;
if (['[图片]','[文件]','[表情]','[语音]'].includes(content)) return;
results.push({{ sender, content, time }});
}});
return results.slice(-{msg_limit});
}}
""")
ctx.close()
messages = raw_messages or []
if not messages:
return (
"# 消息记录\n\n"
f"⚠️ 未能自动提取 {name} 的消息。\n"
"可能原因:钉钉网页版 DOM 结构变化,或未找到对话。\n"
"建议手动截图聊天记录后上传。\n"
)
long_msgs = [m for m in messages if len(m.get("content", "")) > 50]
short_msgs = [m for m in messages if len(m.get("content", "")) <= 50]
lines = [
"# 消息记录(钉钉浏览器采集)",
f"目标:{name}",
f"{len(messages)}",
"注意:钉钉 API 不支持历史消息拉取,本内容通过浏览器采集",
"",
"---",
"",
"## 长消息(观点/决策/技术类)",
"",
]
for m in long_msgs:
lines.append(f"[{m.get('time', '')}] {m.get('content', '')}")
lines.append("")
lines += ["---", "", "## 日常消息(风格参考)", ""]
for m in short_msgs[:300]:
lines.append(f"[{m.get('time', '')}] {m.get('content', '')}")
return "\n".join(lines)
# ─── 主流程 ───────────────────────────────────────────────────────────────────
def collect_all(
name: str,
output_dir: Path,
msg_limit: int,
doc_limit: int,
skip_messages: bool,
chrome_profile: Optional[str],
headless: bool,
config: dict,
) -> dict:
output_dir.mkdir(parents=True, exist_ok=True)
results = {}
print(f"\n🔍 开始采集(钉钉):{name}\n", file=sys.stderr)
# Step 1: 搜索用户
user = find_user(name, config)
if not user:
print(f"❌ 未找到用户:{name}", file=sys.stderr)
sys.exit(1)
print(f" 用户 ID{user.get('userId', '')} 部门:{user.get('deptNameList', [''])[0] if isinstance(user.get('deptNameList'), list) and user.get('deptNameList') else ''}", file=sys.stderr)
# Step 2: 文档
print(f"\n📄 采集文档(上限 {doc_limit} 篇)...", file=sys.stderr)
try:
doc_content = collect_docs(user, doc_limit, config)
doc_path = output_dir / "docs.txt"
doc_path.write_text(doc_content, encoding="utf-8")
results["docs"] = str(doc_path)
print(f" ✅ 文档 → {doc_path}", file=sys.stderr)
except Exception as e:
print(f" ⚠️ 文档采集失败:{e}", file=sys.stderr)
# Step 3: 多维表格
print(f"\n📊 采集多维表格 ...", file=sys.stderr)
try:
bitable_content = collect_bitables(user, config)
bt_path = output_dir / "bitables.txt"
bt_path.write_text(bitable_content, encoding="utf-8")
results["bitables"] = str(bt_path)
print(f" ✅ 多维表格 → {bt_path}", file=sys.stderr)
except Exception as e:
print(f" ⚠️ 多维表格采集失败:{e}", file=sys.stderr)
# Step 4: 消息记录(浏览器方案)
if not skip_messages:
print(f"\n📨 采集消息记录(浏览器方案,上限 {msg_limit} 条)...", file=sys.stderr)
print(f" 钉钉 API 不支持历史消息拉取,自动切换浏览器方案", file=sys.stderr)
try:
msg_content = collect_messages_browser(name, msg_limit, chrome_profile, headless)
msg_path = output_dir / "messages.txt"
msg_path.write_text(msg_content, encoding="utf-8")
results["messages"] = str(msg_path)
print(f" ✅ 消息记录 → {msg_path}", file=sys.stderr)
except Exception as e:
print(f" ⚠️ 消息采集失败:{e}", file=sys.stderr)
else:
print(f"\n📨 跳过消息采集(--skip-messages", file=sys.stderr)
# 写摘要
summary = {
"name": name,
"user_id": user.get("userId", ""),
"platform": "dingtalk",
"department": user.get("deptNameList", []),
"collected_at": datetime.now(timezone.utc).isoformat(),
"files": results,
"notes": "消息记录通过浏览器采集,钉钉 API 不支持历史消息拉取",
}
(output_dir / "collection_summary.json").write_text(
json.dumps(summary, ensure_ascii=False, indent=2)
)
print(f"\n✅ 采集完成 → {output_dir}", file=sys.stderr)
print(f" 文件:{', '.join(results.keys())}", file=sys.stderr)
return results
def main() -> None:
parser = argparse.ArgumentParser(description="钉钉数据自动采集器")
parser.add_argument("--setup", action="store_true", help="初始化配置")
parser.add_argument("--name", help="同事姓名")
parser.add_argument("--output-dir", default=None, help="输出目录")
parser.add_argument("--msg-limit", type=int, default=500, help="最多采集消息条数(默认 500")
parser.add_argument("--doc-limit", type=int, default=20, help="最多采集文档篇数(默认 20")
parser.add_argument("--skip-messages", action="store_true", help="跳过消息记录采集")
parser.add_argument("--chrome-profile", default=None, help="Chrome Profile 路径")
parser.add_argument("--show-browser", action="store_true", help="显示浏览器窗口(调试/首次登录)")
args = parser.parse_args()
if args.setup:
setup_config()
return
if not args.name:
parser.error("请提供 --name")
config = load_config()
output_dir = Path(args.output_dir) if args.output_dir else Path(f"./knowledge/{args.name}")
collect_all(
name=args.name,
output_dir=output_dir,
msg_limit=args.msg_limit,
doc_limit=args.doc_limit,
skip_messages=args.skip_messages,
chrome_profile=args.chrome_profile,
headless=not args.show_browser,
config=config,
)
if __name__ == "__main__":
main()

339
tools/email_parser.py Normal file
View File

@@ -0,0 +1,339 @@
#!/usr/bin/env python3
"""
邮件解析器
支持格式:
1. .eml 文件(标准邮件格式)
2. .txt 文件(纯文本邮件记录)
3. .mbox 文件(多封邮件合集)
用法:
python email_parser.py --file emails.eml --target "zhangsan@company.com" --output output.txt
python email_parser.py --file inbox.mbox --target "张三" --output output.txt
"""
import email
import email.policy
import mailbox
import re
import sys
import argparse
from pathlib import Path
from email.header import decode_header
from html.parser import HTMLParser
class HTMLTextExtractor(HTMLParser):
"""从 HTML 邮件内容中提取纯文本"""
def __init__(self):
super().__init__()
self.result = []
self._skip = False
def handle_starttag(self, tag, attrs):
if tag in ("script", "style"):
self._skip = True
def handle_endtag(self, tag):
if tag in ("script", "style"):
self._skip = False
if tag in ("p", "br", "div", "tr"):
self.result.append("\n")
def handle_data(self, data):
if not self._skip:
self.result.append(data)
def get_text(self):
return re.sub(r"\n{3,}", "\n\n", "".join(self.result)).strip()
def decode_mime_str(s: str) -> str:
"""解码 MIME 编码的邮件头字段"""
if not s:
return ""
parts = decode_header(s)
result = []
for part, charset in parts:
if isinstance(part, bytes):
charset = charset or "utf-8"
try:
result.append(part.decode(charset, errors="replace"))
except Exception:
result.append(part.decode("utf-8", errors="replace"))
else:
result.append(str(part))
return "".join(result)
def extract_email_body(msg) -> str:
"""从邮件对象中提取正文文本"""
body = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
disposition = str(part.get("Content-Disposition", ""))
if "attachment" in disposition:
continue
if content_type == "text/plain":
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
try:
body = payload.decode(charset, errors="replace")
break
except Exception:
body = payload.decode("utf-8", errors="replace")
break
elif content_type == "text/html" and not body:
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
try:
html = payload.decode(charset, errors="replace")
except Exception:
html = payload.decode("utf-8", errors="replace")
extractor = HTMLTextExtractor()
extractor.feed(html)
body = extractor.get_text()
else:
payload = msg.get_payload(decode=True)
if payload:
charset = msg.get_content_charset() or "utf-8"
try:
body = payload.decode(charset, errors="replace")
except Exception:
body = payload.decode("utf-8", errors="replace")
# 清理引用内容Re: 时的原文引用)
body = re.sub(r"\n>.*", "", body)
body = re.sub(r"\n-{3,}.*?原始邮件.*?\n", "\n", body, flags=re.DOTALL)
body = re.sub(r"\n_{3,}\n.*", "", body, flags=re.DOTALL)
return body.strip()
def is_from_target(from_field: str, target: str) -> bool:
"""判断邮件是否来自目标人"""
from_str = decode_mime_str(from_field).lower()
target_lower = target.lower()
return target_lower in from_str
def parse_eml_file(file_path: str, target: str) -> list[dict]:
"""解析单个 .eml 文件"""
with open(file_path, "rb") as f:
msg = email.message_from_binary_file(f, policy=email.policy.default)
from_field = str(msg.get("From", ""))
if not is_from_target(from_field, target):
return []
subject = decode_mime_str(str(msg.get("Subject", "")))
date = str(msg.get("Date", ""))
body = extract_email_body(msg)
if not body:
return []
return [{
"from": decode_mime_str(from_field),
"subject": subject,
"date": date,
"body": body,
}]
def parse_mbox_file(file_path: str, target: str) -> list[dict]:
"""解析 .mbox 文件(多封邮件合集)"""
results = []
mbox = mailbox.mbox(file_path)
for msg in mbox:
from_field = str(msg.get("From", ""))
if not is_from_target(from_field, target):
continue
subject = decode_mime_str(str(msg.get("Subject", "")))
date = str(msg.get("Date", ""))
body = extract_email_body(msg)
if not body:
continue
results.append({
"from": decode_mime_str(from_field),
"subject": subject,
"date": date,
"body": body,
})
return results
def parse_txt_file(file_path: str, target: str) -> list[dict]:
"""
解析纯文本格式的邮件记录
支持简单的分隔格式:
From: xxx
Subject: xxx
Date: xxx
---
正文内容
===
"""
results = []
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
# 尝试按分隔符切割多封邮件
emails_raw = re.split(r"\n={3,}\n|\n-{3,}\n(?=From:)", content)
for raw in emails_raw:
from_match = re.search(r"^From:\s*(.+)$", raw, re.MULTILINE)
subject_match = re.search(r"^Subject:\s*(.+)$", raw, re.MULTILINE)
date_match = re.search(r"^Date:\s*(.+)$", raw, re.MULTILINE)
from_field = from_match.group(1).strip() if from_match else ""
if not is_from_target(from_field, target):
continue
# 提取正文(去掉头部字段后的内容)
body = re.sub(r"^(From|To|Subject|Date|CC|BCC):.*\n?", "", raw, flags=re.MULTILINE)
body = body.strip()
if not body:
continue
results.append({
"from": from_field,
"subject": subject_match.group(1).strip() if subject_match else "",
"date": date_match.group(1).strip() if date_match else "",
"body": body,
})
return results
def classify_emails(emails: list[dict]) -> dict:
"""
对邮件按内容分类:
- 长邮件(正文 > 200 字):技术方案、观点陈述
- 决策类:包含明确判断的邮件
- 日常沟通:短邮件
"""
long_emails = []
decision_emails = []
daily_emails = []
decision_keywords = [
"同意", "不同意", "建议", "方案", "觉得", "应该", "决定", "确认",
"approve", "reject", "lgtm", "suggest", "recommend", "think",
"我的看法", "我认为", "我觉得", "需要", "必须", "不需要"
]
for e in emails:
body = e["body"]
if len(body) > 200:
long_emails.append(e)
elif any(kw in body.lower() for kw in decision_keywords):
decision_emails.append(e)
else:
daily_emails.append(e)
return {
"long_emails": long_emails,
"decision_emails": decision_emails,
"daily_emails": daily_emails,
"total_count": len(emails),
}
def format_output(target: str, classified: dict) -> str:
"""格式化输出,供 AI 分析使用"""
lines = [
f"# 邮件提取结果",
f"目标人物:{target}",
f"总邮件数:{classified['total_count']}",
"",
"---",
"",
"## 长邮件(技术方案/观点类,权重最高)",
"",
]
for e in classified["long_emails"]:
lines.append(f"**主题:{e['subject']}** [{e['date']}]")
lines.append(e["body"])
lines.append("")
lines.append("---")
lines.append("")
lines += [
"## 决策类邮件",
"",
]
for e in classified["decision_emails"]:
lines.append(f"**主题:{e['subject']}** [{e['date']}]")
lines.append(e["body"])
lines.append("")
lines += [
"---",
"",
"## 日常沟通(风格参考)",
"",
]
for e in classified["daily_emails"][:30]:
lines.append(f"**{e['subject']}**{e['body'][:200]}")
lines.append("")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="解析邮件文件,提取目标人发出的邮件")
parser.add_argument("--file", required=True, help="输入文件路径(.eml / .mbox / .txt")
parser.add_argument("--target", required=True, help="目标人物(邮箱地址或姓名)")
parser.add_argument("--output", default=None, help="输出文件路径(默认打印到 stdout")
args = parser.parse_args()
file_path = Path(args.file)
if not file_path.exists():
print(f"错误:文件不存在 {file_path}", file=sys.stderr)
sys.exit(1)
suffix = file_path.suffix.lower()
if suffix == ".eml":
emails = parse_eml_file(str(file_path), args.target)
elif suffix == ".mbox":
emails = parse_mbox_file(str(file_path), args.target)
else:
emails = parse_txt_file(str(file_path), args.target)
if not emails:
print(f"警告:未找到来自 '{args.target}' 的邮件", file=sys.stderr)
print("提示:请检查目标名称/邮箱是否与文件中的 From 字段一致", file=sys.stderr)
classified = classify_emails(emails)
output = format_output(args.target, classified)
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
f.write(output)
print(f"已输出到 {args.output},共 {len(emails)} 封邮件")
else:
print(output)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,605 @@
#!/usr/bin/env python3
"""
飞书自动采集器
输入同事姓名,自动:
1. 搜索飞书用户,获取 user_id
2. 找到与他共同的群聊,拉取他的消息记录
3. 搜索他创建/编辑的文档和 Wiki
4. 拉取文档内容
5. 拉取多维表格(如有)
6. 输出统一格式,直接进 colleague-creator 分析流程
前置:
python3 feishu_auto_collector.py --setup # 配置 App ID / Secret一次性
用法:
python3 feishu_auto_collector.py --name "张三" --output-dir ./knowledge/zhangsan
python3 feishu_auto_collector.py --name "张三" --msg-limit 1000 --doc-limit 20
"""
from __future__ import annotations
import json
import sys
import time
import argparse
from pathlib import Path
from datetime import datetime, timezone
from typing import Optional
try:
import requests
except ImportError:
print("错误:请先安装 requestspip3 install requests", file=sys.stderr)
sys.exit(1)
CONFIG_PATH = Path.home() / ".colleague-skill" / "feishu_config.json"
BASE_URL = "https://open.feishu.cn/open-apis"
# ─── 配置 ────────────────────────────────────────────────────────────────────
def load_config() -> dict:
if not CONFIG_PATH.exists():
print("未找到配置请先运行python3 feishu_auto_collector.py --setup", file=sys.stderr)
sys.exit(1)
return json.loads(CONFIG_PATH.read_text())
def save_config(config: dict) -> None:
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
CONFIG_PATH.write_text(json.dumps(config, indent=2, ensure_ascii=False))
def setup_config() -> None:
print("=== 飞书自动采集配置 ===\n")
print("请前往 https://open.feishu.cn 创建企业自建应用,开通以下权限:")
print()
print(" 消息类:")
print(" im:message:readonly 读取消息")
print(" im:chat:readonly 读取群聊信息")
print(" im:chat.members:readonly 读取群成员")
print()
print(" 用户类:")
print(" contact:user.base:readonly 搜索用户")
print()
print(" 文档类:")
print(" docs:doc:readonly 读取文档")
print(" wiki:wiki:readonly 读取知识库")
print(" drive:drive:readonly 搜索云盘文件")
print()
print(" 多维表格:")
print(" bitable:app:readonly 读取多维表格")
print()
app_id = input("App ID (cli_xxx): ").strip()
app_secret = input("App Secret: ").strip()
config = {"app_id": app_id, "app_secret": app_secret}
save_config(config)
print(f"\n✅ 配置已保存到 {CONFIG_PATH}")
# ─── Token ───────────────────────────────────────────────────────────────────
_token_cache: dict = {}
def get_tenant_token(config: dict) -> str:
"""获取 tenant_access_token带缓存有效期约 2 小时)"""
now = time.time()
if _token_cache.get("token") and _token_cache.get("expire", 0) > now + 60:
return _token_cache["token"]
resp = requests.post(
f"{BASE_URL}/auth/v3/tenant_access_token/internal",
json={"app_id": config["app_id"], "app_secret": config["app_secret"]},
timeout=10,
)
data = resp.json()
if data.get("code") != 0:
print(f"获取 token 失败:{data}", file=sys.stderr)
sys.exit(1)
token = data["tenant_access_token"]
_token_cache["token"] = token
_token_cache["expire"] = now + data.get("expire", 7200)
return token
def api_get(path: str, params: dict, config: dict) -> dict:
token = get_tenant_token(config)
resp = requests.get(
f"{BASE_URL}{path}",
params=params,
headers={"Authorization": f"Bearer {token}"},
timeout=15,
)
return resp.json()
def api_post(path: str, body: dict, config: dict) -> dict:
token = get_tenant_token(config)
resp = requests.post(
f"{BASE_URL}{path}",
json=body,
headers={"Authorization": f"Bearer {token}"},
timeout=15,
)
return resp.json()
# ─── 用户搜索 ─────────────────────────────────────────────────────────────────
def find_user(name: str, config: dict) -> Optional[dict]:
"""通过姓名搜索飞书用户"""
print(f" 搜索用户:{name} ...", file=sys.stderr)
data = api_get(
"/search/v1/user",
{"query": name, "page_size": 10},
config,
)
if data.get("code") != 0:
print(f" 搜索用户失败code={data.get('code')}{data.get('msg')}", file=sys.stderr)
return None
users = data.get("data", {}).get("results", [])
if not users:
print(f" 未找到用户:{name}", file=sys.stderr)
return None
if len(users) == 1:
u = users[0]
print(f" 找到用户:{u.get('name')}{u.get('department_path', [''])[0]}", file=sys.stderr)
return u
# 多个结果,让用户选择
print(f"\n 找到 {len(users)} 个结果,请选择:")
for i, u in enumerate(users):
dept = u.get("department_path", [""])
dept_str = dept[0] if dept else ""
print(f" [{i+1}] {u.get('name')} {dept_str} {u.get('user_id', '')}")
choice = input("\n 选择编号(默认 1").strip() or "1"
try:
idx = int(choice) - 1
return users[idx]
except (ValueError, IndexError):
return users[0]
# ─── 消息记录 ─────────────────────────────────────────────────────────────────
def get_chats_with_user(user_open_id: str, config: dict) -> list:
"""找到 bot 和目标用户共同在的群聊"""
print(" 获取群聊列表 ...", file=sys.stderr)
chats = []
page_token = None
while True:
params = {"page_size": 100}
if page_token:
params["page_token"] = page_token
data = api_get("/im/v1/chats", params, config)
if data.get("code") != 0:
print(f" 获取群聊失败:{data.get('msg')}", file=sys.stderr)
break
items = data.get("data", {}).get("items", [])
chats.extend(items)
if not data.get("data", {}).get("has_more"):
break
page_token = data.get("data", {}).get("page_token")
print(f"{len(chats)} 个群聊,检查成员 ...", file=sys.stderr)
# 过滤:目标用户在其中的群
result = []
for chat in chats:
chat_id = chat.get("chat_id")
if not chat_id:
continue
members_data = api_get(
f"/im/v1/chats/{chat_id}/members",
{"page_size": 100},
config,
)
members = members_data.get("data", {}).get("items", [])
for m in members:
if m.get("member_id") == user_open_id or m.get("open_id") == user_open_id:
result.append(chat)
print(f"{chat.get('name', chat_id)}", file=sys.stderr)
break
return result
def fetch_messages_from_chat(
chat_id: str,
user_open_id: str,
limit: int,
config: dict,
) -> list:
"""从指定群聊拉取目标用户的消息"""
messages = []
page_token = None
while len(messages) < limit:
params = {
"container_id_type": "chat",
"container_id": chat_id,
"page_size": 50,
"sort_type": "ByCreateTimeDesc",
}
if page_token:
params["page_token"] = page_token
data = api_get("/im/v1/messages", params, config)
if data.get("code") != 0:
break
items = data.get("data", {}).get("items", [])
if not items:
break
for item in items:
sender = item.get("sender", {})
sender_id = sender.get("id") or sender.get("open_id", "")
if sender_id != user_open_id:
continue
# 解析消息内容
content_raw = item.get("body", {}).get("content", "")
try:
content_obj = json.loads(content_raw)
# 富文本消息
if isinstance(content_obj, dict):
text_parts = []
for line in content_obj.get("content", []):
for seg in line:
if seg.get("tag") in ("text", "a"):
text_parts.append(seg.get("text", ""))
content = " ".join(text_parts)
else:
content = str(content_obj)
except Exception:
content = content_raw
content = content.strip()
if not content or content in ("[图片]", "[文件]", "[表情]", "[语音]"):
continue
ts = item.get("create_time", "")
if ts:
try:
ts = datetime.fromtimestamp(int(ts) / 1000).strftime("%Y-%m-%d %H:%M")
except Exception:
pass
messages.append({"content": content, "time": ts})
if not data.get("data", {}).get("has_more"):
break
page_token = data.get("data", {}).get("page_token")
return messages[:limit]
def collect_messages(
user: dict,
msg_limit: int,
config: dict,
) -> str:
"""采集目标用户的所有消息记录"""
user_open_id = user.get("open_id") or user.get("user_id", "")
name = user.get("name", "")
chats = get_chats_with_user(user_open_id, config)
if not chats:
return f"# 消息记录\n\n未找到与 {name} 共同的群聊(请确认 bot 已被添加到相关群)\n"
all_messages = []
per_chat_limit = max(100, msg_limit // len(chats))
for chat in chats:
chat_id = chat.get("chat_id")
chat_name = chat.get("name", chat_id)
print(f" 拉取「{chat_name}」消息 ...", file=sys.stderr)
msgs = fetch_messages_from_chat(chat_id, user_open_id, per_chat_limit, config)
for m in msgs:
m["chat"] = chat_name
all_messages.extend(msgs)
print(f" 获取 {len(msgs)}", file=sys.stderr)
# 分类输出
long_msgs = [m for m in all_messages if len(m.get("content", "")) > 50]
short_msgs = [m for m in all_messages if len(m.get("content", "")) <= 50]
lines = [
f"# 飞书消息记录(自动采集)",
f"目标:{name}",
f"来源群聊:{', '.join(c.get('name', '') for c in chats)}",
f"{len(all_messages)} 条消息",
"",
"---",
"",
"## 长消息(观点/决策/技术类)",
"",
]
for m in long_msgs:
lines.append(f"[{m.get('time', '')}][{m.get('chat', '')}] {m['content']}")
lines.append("")
lines += ["---", "", "## 日常消息(风格参考)", ""]
for m in short_msgs[:300]:
lines.append(f"[{m.get('time', '')}] {m['content']}")
return "\n".join(lines)
# ─── 文档采集 ─────────────────────────────────────────────────────────────────
def search_docs_by_user(user_open_id: str, name: str, doc_limit: int, config: dict) -> list:
"""搜索目标用户创建或编辑的文档"""
print(f" 搜索 {name} 的文档 ...", file=sys.stderr)
data = api_post(
"/search/v2/message",
{
"query": name,
"search_type": "docs",
"docs_options": {
"creator_ids": [user_open_id],
},
"page_size": doc_limit,
},
config,
)
if data.get("code") != 0:
# fallback用关键词搜索
print(f" 按创建人搜索失败,改用关键词搜索 ...", file=sys.stderr)
data = api_post(
"/search/v2/message",
{
"query": name,
"search_type": "docs",
"page_size": doc_limit,
},
config,
)
docs = []
for item in data.get("data", {}).get("results", []):
doc_info = item.get("docs_info", {})
if doc_info:
docs.append({
"title": doc_info.get("title", ""),
"url": doc_info.get("url", ""),
"type": doc_info.get("docs_type", ""),
"creator": doc_info.get("creator", {}).get("name", ""),
})
print(f" 找到 {len(docs)} 篇文档", file=sys.stderr)
return docs
def fetch_doc_content(doc_token: str, doc_type: str, config: dict) -> str:
"""拉取单篇文档内容"""
if doc_type in ("doc", "docx"):
data = api_get(f"/docx/v1/documents/{doc_token}/raw_content", {}, config)
return data.get("data", {}).get("content", "")
elif doc_type == "wiki":
# 先获取 wiki node 信息
node_data = api_get(f"/wiki/v2/spaces/get_node", {"token": doc_token}, config)
obj_token = node_data.get("data", {}).get("node", {}).get("obj_token", doc_token)
obj_type = node_data.get("data", {}).get("node", {}).get("obj_type", "docx")
return fetch_doc_content(obj_token, obj_type, config)
return ""
def collect_docs(user: dict, doc_limit: int, config: dict) -> str:
"""采集目标用户的文档"""
import re
user_open_id = user.get("open_id") or user.get("user_id", "")
name = user.get("name", "")
docs = search_docs_by_user(user_open_id, name, doc_limit, config)
if not docs:
return f"# 文档内容\n\n未找到 {name} 相关文档\n"
lines = [
f"# 文档内容(自动采集)",
f"目标:{name}",
f"{len(docs)}",
"",
]
for doc in docs:
url = doc.get("url", "")
title = doc.get("title", "无标题")
doc_type = doc.get("type", "")
print(f" 拉取文档:{title} ...", file=sys.stderr)
# 从 URL 提取 token
token_match = re.search(r"/(?:wiki|docx|docs|sheets|base)/([A-Za-z0-9]+)", url)
if not token_match:
continue
doc_token = token_match.group(1)
content = fetch_doc_content(doc_token, doc_type or "docx", config)
if not content or len(content.strip()) < 20:
print(f" 内容为空,跳过", file=sys.stderr)
continue
lines += [
f"---",
f"## 《{title}",
f"链接:{url}",
f"创建人:{doc.get('creator', '')}",
"",
content.strip(),
"",
]
return "\n".join(lines)
# ─── 多维表格 ─────────────────────────────────────────────────────────────────
def collect_bitable(app_token: str, config: dict) -> str:
"""拉取多维表格内容"""
# 获取所有 table
data = api_get(f"/bitable/v1/apps/{app_token}/tables", {"page_size": 100}, config)
tables = data.get("data", {}).get("items", [])
if not tables:
return "(多维表格为空)\n"
lines = []
for table in tables:
table_id = table.get("table_id")
table_name = table.get("name", table_id)
# 获取字段
fields_data = api_get(
f"/bitable/v1/apps/{app_token}/tables/{table_id}/fields",
{"page_size": 100},
config,
)
fields = [f.get("field_name", "") for f in fields_data.get("data", {}).get("items", [])]
# 获取记录
records_data = api_get(
f"/bitable/v1/apps/{app_token}/tables/{table_id}/records",
{"page_size": 100},
config,
)
records = records_data.get("data", {}).get("items", [])
lines.append(f"### 表:{table_name}")
lines.append("")
lines.append("| " + " | ".join(fields) + " |")
lines.append("| " + " | ".join(["---"] * len(fields)) + " |")
for rec in records:
row_data = rec.get("fields", {})
row = []
for f in fields:
val = row_data.get(f, "")
if isinstance(val, list):
val = " ".join(
v.get("text", str(v)) if isinstance(v, dict) else str(v)
for v in val
)
row.append(str(val).replace("|", "").replace("\n", " "))
lines.append("| " + " | ".join(row) + " |")
lines.append("")
return "\n".join(lines)
# ─── 主流程 ───────────────────────────────────────────────────────────────────
def collect_all(
name: str,
output_dir: Path,
msg_limit: int,
doc_limit: int,
config: dict,
) -> dict:
"""采集某同事的所有可用数据,输出到 output_dir"""
output_dir.mkdir(parents=True, exist_ok=True)
results = {}
print(f"\n🔍 开始采集:{name}\n", file=sys.stderr)
# Step 1: 搜索用户
user = find_user(name, config)
if not user:
print(f"❌ 未找到用户 {name},请检查姓名是否正确", file=sys.stderr)
sys.exit(1)
# Step 2: 采集消息记录
print(f"\n📨 采集消息记录(上限 {msg_limit} 条)...", file=sys.stderr)
try:
msg_content = collect_messages(user, msg_limit, config)
msg_path = output_dir / "messages.txt"
msg_path.write_text(msg_content, encoding="utf-8")
results["messages"] = str(msg_path)
print(f" ✅ 消息记录 → {msg_path}", file=sys.stderr)
except Exception as e:
print(f" ⚠️ 消息采集失败:{e}", file=sys.stderr)
# Step 3: 采集文档
print(f"\n📄 采集文档(上限 {doc_limit} 篇)...", file=sys.stderr)
try:
doc_content = collect_docs(user, doc_limit, config)
doc_path = output_dir / "docs.txt"
doc_path.write_text(doc_content, encoding="utf-8")
results["docs"] = str(doc_path)
print(f" ✅ 文档内容 → {doc_path}", file=sys.stderr)
except Exception as e:
print(f" ⚠️ 文档采集失败:{e}", file=sys.stderr)
# 写摘要
summary = {
"name": name,
"user_id": user.get("user_id", ""),
"open_id": user.get("open_id", ""),
"department": user.get("department_path", []),
"collected_at": datetime.now(timezone.utc).isoformat(),
"files": results,
}
(output_dir / "collection_summary.json").write_text(
json.dumps(summary, ensure_ascii=False, indent=2)
)
print(f"\n✅ 采集完成,输出目录:{output_dir}", file=sys.stderr)
return results
def main() -> None:
parser = argparse.ArgumentParser(description="飞书数据自动采集器")
parser.add_argument("--setup", action="store_true", help="初始化配置")
parser.add_argument("--name", help="同事姓名")
parser.add_argument("--output-dir", default=None, help="输出目录(默认 ./knowledge/{name}")
parser.add_argument("--msg-limit", type=int, default=1000, help="最多采集消息条数(默认 1000")
parser.add_argument("--doc-limit", type=int, default=20, help="最多采集文档篇数(默认 20")
args = parser.parse_args()
if args.setup:
setup_config()
return
if not args.name:
parser.error("请提供 --name")
config = load_config()
output_dir = Path(args.output_dir) if args.output_dir else Path(f"./knowledge/{args.name}")
collect_all(
name=args.name,
output_dir=output_dir,
msg_limit=args.msg_limit,
doc_limit=args.doc_limit,
config=config,
)
if __name__ == "__main__":
main()

374
tools/feishu_browser.py Normal file
View File

@@ -0,0 +1,374 @@
#!/usr/bin/env python3
"""
飞书浏览器抓取器Playwright 方案)
复用本机 Chrome 登录态,无需任何 token能访问你有权限的所有飞书内容。
支持:
- 飞书文档docx/docs
- 飞书知识库wiki
- 飞书表格sheets→ 导出为 CSV
- 飞书消息记录(指定群聊)
安装:
pip install playwright
playwright install chromium
用法:
python3 feishu_browser.py --url "https://xxx.feishu.cn/wiki/xxx" --output out.txt
python3 feishu_browser.py --url "https://xxx.feishu.cn/docx/xxx" --output out.txt
python3 feishu_browser.py --chat "后端组" --target "张三" --limit 500 --output out.txt
python3 feishu_browser.py --url "https://xxx.feishu.cn/sheets/xxx" --output out.csv
"""
from __future__ import annotations
import sys
import time
import json
import argparse
import platform
from pathlib import Path
from typing import Optional
def get_default_chrome_profile() -> str:
"""根据操作系统返回 Chrome 默认 Profile 路径"""
system = platform.system()
if system == "Darwin":
return str(Path.home() / "Library/Application Support/Google/Chrome/Default")
elif system == "Linux":
return str(Path.home() / ".config/google-chrome/Default")
elif system == "Windows":
import os
return str(Path(os.environ.get("LOCALAPPDATA", "")) / "Google/Chrome/User Data/Default")
return str(Path.home() / ".config/google-chrome/Default")
def make_context(playwright, chrome_profile: Optional[str], headless: bool):
"""创建复用登录态的浏览器上下文"""
profile = chrome_profile or get_default_chrome_profile()
try:
ctx = playwright.chromium.launch_persistent_context(
user_data_dir=profile,
headless=headless,
args=[
"--disable-blink-features=AutomationControlled",
"--no-first-run",
"--no-default-browser-check",
],
ignore_default_args=["--enable-automation"],
viewport={"width": 1280, "height": 900},
)
return ctx
except Exception as e:
print(f"⚠️ 无法加载 Chrome Profile{e}", file=sys.stderr)
print(f" 尝试的路径:{profile}", file=sys.stderr)
print(" 请用 --chrome-profile 手动指定路径", file=sys.stderr)
sys.exit(1)
def detect_page_type(url: str) -> str:
"""根据 URL 判断飞书页面类型"""
if "/wiki/" in url:
return "wiki"
elif "/docx/" in url or "/docs/" in url:
return "doc"
elif "/sheets/" in url or "/spreadsheets/" in url:
return "sheet"
elif "/base/" in url:
return "base"
else:
return "unknown"
def fetch_doc(page, url: str) -> str:
"""抓取飞书文档或 Wiki 的文本内容"""
page.goto(url, wait_until="domcontentloaded", timeout=30000)
# 等待编辑器加载(飞书文档渲染较慢)
selectors = [
".docs-reader-content",
".lark-editor-content",
"[data-block-type]",
".doc-render-core",
".wiki-content",
".node-doc-content",
]
loaded = False
for sel in selectors:
try:
page.wait_for_selector(sel, timeout=15000)
loaded = True
break
except Exception:
continue
if not loaded:
# 等待一段时间后直接提取 body 文本
time.sleep(5)
# 额外等待异步内容渲染
time.sleep(2)
# 尝试多个选择器提取正文
for sel in selectors:
try:
el = page.query_selector(sel)
if el:
text = el.inner_text()
if len(text.strip()) > 50:
return text.strip()
except Exception:
continue
# fallback提取整个 body
text = page.inner_text("body")
return text.strip()
def fetch_sheet(page, url: str) -> str:
"""抓取飞书表格,转为 CSV 格式"""
page.goto(url, wait_until="domcontentloaded", timeout=30000)
try:
page.wait_for_selector(".spreadsheet-container, .sheet-container", timeout=15000)
except Exception:
time.sleep(5)
time.sleep(3)
# 通过 JS 提取表格数据
data = page.evaluate("""
() => {
const rows = [];
// 尝试从 DOM 提取可见单元格
const cells = document.querySelectorAll('[data-row][data-col]');
if (cells.length === 0) return null;
const grid = {};
let maxRow = 0, maxCol = 0;
cells.forEach(cell => {
const r = parseInt(cell.getAttribute('data-row'));
const c = parseInt(cell.getAttribute('data-col'));
if (!grid[r]) grid[r] = {};
grid[r][c] = cell.innerText.replace(/\\n/g, ' ').trim();
maxRow = Math.max(maxRow, r);
maxCol = Math.max(maxCol, c);
});
for (let r = 0; r <= maxRow; r++) {
const row = [];
for (let c = 0; c <= maxCol; c++) {
row.push(grid[r] && grid[r][c] ? grid[r][c] : '');
}
rows.push(row);
}
return rows;
}
""")
if data:
lines = []
for row in data:
lines.append(",".join(f'"{cell}"' for cell in row))
return "\n".join(lines)
# fallback直接提取文本
return page.inner_text("body")
def fetch_messages(page, chat_name: str, target_name: str, limit: int = 500) -> str:
"""
抓取指定群聊中目标人物的消息记录。
需要先导航到飞书 Web 版消息页面。
"""
# 打开飞书消息页
page.goto("https://applink.feishu.cn/client/chat/open", wait_until="domcontentloaded", timeout=20000)
time.sleep(3)
# 尝试搜索群聊
try:
# 点击搜索
search_btn = page.query_selector('[data-test-id="search-btn"], .search-button, [placeholder*="搜索"]')
if search_btn:
search_btn.click()
time.sleep(1)
page.keyboard.type(chat_name)
time.sleep(2)
# 选择第一个结果
result = page.query_selector('.search-result-item:first-child, .im-search-item:first-child')
if result:
result.click()
time.sleep(2)
except Exception as e:
print(f"⚠️ 自动搜索群聊失败:{e}", file=sys.stderr)
print(f" 请手动导航到「{chat_name}」群聊,然后按回车继续...", file=sys.stderr)
input()
# 向上滚动加载历史消息
print(f"正在加载消息历史...", file=sys.stderr)
messages_container = page.query_selector('.message-list, .im-message-list, [data-testid="message-list"]')
if messages_container:
for _ in range(10): # 滚动 10 次
page.evaluate("el => el.scrollTop = 0", messages_container)
time.sleep(1.5)
else:
for _ in range(10):
page.keyboard.press("Control+Home")
time.sleep(1.5)
time.sleep(2)
# 提取消息
messages = page.evaluate(f"""
() => {{
const target = "{target_name}";
const results = [];
// 常见的消息 DOM 结构
const msgSelectors = [
'.message-item',
'.im-message-item',
'[data-message-id]',
'.msg-list-item',
];
let items = [];
for (const sel of msgSelectors) {{
items = document.querySelectorAll(sel);
if (items.length > 0) break;
}}
items.forEach(item => {{
const senderEl = item.querySelector(
'.sender-name, .message-sender, [data-testid="sender-name"], .name'
);
const contentEl = item.querySelector(
'.message-content, .msg-content, [data-testid="message-content"], .text-content'
);
const timeEl = item.querySelector(
'.message-time, .msg-time, [data-testid="message-time"], .time'
);
const sender = senderEl ? senderEl.innerText.trim() : '';
const content = contentEl ? contentEl.innerText.trim() : '';
const time = timeEl ? timeEl.innerText.trim() : '';
if (!content) return;
if (target && !sender.includes(target)) return;
results.push({{ sender, content, time }});
}});
return results.slice(-{limit});
}}
""")
if not messages:
print("⚠️ 未能自动提取消息,尝试提取页面文本", file=sys.stderr)
return page.inner_text("body")
# 按权重分类输出
long_msgs = [m for m in messages if len(m.get("content", "")) > 50]
short_msgs = [m for m in messages if len(m.get("content", "")) <= 50]
lines = [
f"# 飞书消息记录(浏览器抓取)",
f"群聊:{chat_name}",
f"目标人物:{target_name}",
f"{len(messages)} 条消息",
"",
"---",
"",
"## 长消息(观点/决策类)",
"",
]
for m in long_msgs:
lines.append(f"[{m.get('time', '')}] {m.get('content', '')}")
lines.append("")
lines += ["---", "", "## 日常消息", ""]
for m in short_msgs[:200]:
lines.append(f"[{m.get('time', '')}] {m.get('content', '')}")
return "\n".join(lines)
def main() -> None:
parser = argparse.ArgumentParser(description="飞书浏览器抓取器(复用 Chrome 登录态)")
parser.add_argument("--url", help="飞书文档/Wiki/表格链接")
parser.add_argument("--chat", help="群聊名称(抓取消息记录时使用)")
parser.add_argument("--target", help="目标人物姓名(只提取此人的消息)")
parser.add_argument("--limit", type=int, default=500, help="最多抓取消息条数(默认 500")
parser.add_argument("--output", default=None, help="输出文件路径(默认打印到 stdout")
parser.add_argument("--chrome-profile", default=None, help="Chrome Profile 路径(默认自动检测)")
parser.add_argument("--headless", action="store_true", help="无头模式(不显示浏览器窗口)")
parser.add_argument("--show-browser", action="store_true", help="显示浏览器窗口(调试用)")
args = parser.parse_args()
if not args.url and not args.chat:
parser.error("请提供 --url文档链接或 --chat群聊名称")
try:
from playwright.sync_api import sync_playwright
except ImportError:
print("错误:请先安装 Playwrightpip install playwright && playwright install chromium", file=sys.stderr)
sys.exit(1)
headless = args.headless and not args.show_browser
print(f"启动浏览器({'无头' if headless else '有界面'}模式)...", file=sys.stderr)
with sync_playwright() as p:
ctx = make_context(p, args.chrome_profile, headless=headless)
page = ctx.new_page()
# 检查是否已登录
page.goto("https://www.feishu.cn", wait_until="domcontentloaded", timeout=15000)
time.sleep(2)
if "login" in page.url.lower() or "signin" in page.url.lower():
print("⚠️ 检测到未登录状态。", file=sys.stderr)
print(" 请在打开的浏览器窗口中登录飞书,登录后按回车继续...", file=sys.stderr)
if headless:
print(" 提示:请用 --show-browser 参数显示浏览器窗口以完成登录", file=sys.stderr)
sys.exit(1)
input()
# 根据任务类型执行
if args.url:
page_type = detect_page_type(args.url)
print(f"页面类型:{page_type},开始抓取...", file=sys.stderr)
if page_type == "sheet":
content = fetch_sheet(page, args.url)
else:
content = fetch_doc(page, args.url)
elif args.chat:
content = fetch_messages(
page,
chat_name=args.chat,
target_name=args.target or "",
limit=args.limit,
)
ctx.close()
if not content or len(content.strip()) < 10:
print("⚠️ 未能提取到有效内容", file=sys.stderr)
sys.exit(1)
if args.output:
Path(args.output).write_text(content, encoding="utf-8")
print(f"✅ 已保存到 {args.output}{len(content)} 字符)", file=sys.stderr)
else:
print(content)
if __name__ == "__main__":
main()

310
tools/feishu_mcp_client.py Normal file
View File

@@ -0,0 +1,310 @@
#!/usr/bin/env python3
"""
飞书 MCP 客户端封装cso1z/Feishu-MCP 方案)
通过 Feishu MCP Server 读取文档、wiki、消息记录。
适合:公司已授权的文档、有 App token 权限的内容。
前置要求:
1. 安装 Feishu MCPnpm install -g feishu-mcp
2. 配置 App ID 和 App Secret飞书开放平台创建企业自建应用
3. 给应用开通必要权限(见下方 REQUIRED_PERMISSIONS
权限列表(飞书开放平台 → 权限管理 → 开通):
- docs:doc:readonly 读取文档
- wiki:wiki:readonly 读取知识库
- im:message:readonly 读取消息
- bitable:app:readonly 读取多维表格
- sheets:spreadsheet:readonly 读取表格
用法:
# 配置 token一次性
python3 feishu_mcp_client.py --setup
# 读取文档
python3 feishu_mcp_client.py --url "https://xxx.feishu.cn/wiki/xxx" --output out.txt
# 读取消息记录
python3 feishu_mcp_client.py --chat-id "oc_xxx" --target "张三" --output out.txt
# 列出某空间下的所有文档
python3 feishu_mcp_client.py --list-wiki --space-id "xxx"
"""
from __future__ import annotations
import os
import sys
import json
import argparse
import subprocess
from pathlib import Path
from typing import Optional
CONFIG_PATH = Path.home() / ".colleague-skill" / "feishu_config.json"
# ─── 配置管理 ────────────────────────────────────────────────────────────────
def load_config() -> dict:
if CONFIG_PATH.exists():
return json.loads(CONFIG_PATH.read_text())
return {}
def save_config(config: dict) -> None:
CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True)
CONFIG_PATH.write_text(json.dumps(config, indent=2))
print(f"配置已保存到 {CONFIG_PATH}")
def setup_config() -> None:
print("=== 飞书 MCP 配置 ===")
print("请前往飞书开放平台open.feishu.cn创建企业自建应用获取以下信息\n")
app_id = input("App ID (cli_xxx): ").strip()
app_secret = input("App Secret: ").strip()
print("\n配置方式选择:")
print(" [1] App Token应用权限需要在飞书后台开通对应权限")
print(" [2] User Token个人权限能访问你本人有权限的所有内容需要定期刷新")
mode = input("选择 [1/2],默认 1").strip() or "1"
config = {
"app_id": app_id,
"app_secret": app_secret,
"mode": "app" if mode == "1" else "user",
}
if mode == "2":
print("\n获取 User Token飞书开放平台 → OAuth 2.0 → 获取 user_access_token")
user_token = input("User Access Token (u-xxx)").strip()
config["user_token"] = user_token
print("注意User Token 有效期约 2 小时,过期后需要重新配置")
save_config(config)
print("\n✅ 配置完成!")
# ─── MCP 调用封装 ─────────────────────────────────────────────────────────────
def call_mcp(tool: str, params: dict, config: dict) -> dict:
"""
通过 npx 调用 feishu-mcp 工具。
feishu-mcp 支持 stdio 模式,直接 JSON 通信。
"""
env = os.environ.copy()
env["FEISHU_APP_ID"] = config.get("app_id", "")
env["FEISHU_APP_SECRET"] = config.get("app_secret", "")
if config.get("mode") == "user" and config.get("user_token"):
env["FEISHU_USER_ACCESS_TOKEN"] = config["user_token"]
payload = json.dumps({
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": tool,
"arguments": params,
},
"id": 1,
})
try:
result = subprocess.run(
["npx", "-y", "feishu-mcp", "--stdio"],
input=payload,
capture_output=True,
text=True,
env=env,
timeout=30,
)
if result.returncode != 0:
raise RuntimeError(f"MCP 调用失败:{result.stderr}")
return json.loads(result.stdout)
except FileNotFoundError:
print("错误:未找到 npx请先安装 Node.js", file=sys.stderr)
print("安装 Feishu MCPnpm install -g feishu-mcp", file=sys.stderr)
sys.exit(1)
def extract_doc_token(url: str) -> tuple[str, str]:
"""从飞书 URL 中提取文档 token 和类型"""
import re
patterns = [
(r"/wiki/([A-Za-z0-9]+)", "wiki"),
(r"/docx/([A-Za-z0-9]+)", "docx"),
(r"/docs/([A-Za-z0-9]+)", "doc"),
(r"/sheets/([A-Za-z0-9]+)", "sheet"),
(r"/base/([A-Za-z0-9]+)", "base"),
]
for pattern, doc_type in patterns:
m = re.search(pattern, url)
if m:
return m.group(1), doc_type
raise ValueError(f"无法从 URL 解析文档 token{url}")
# ─── 功能函数 ─────────────────────────────────────────────────────────────────
def fetch_doc_via_mcp(url: str, config: dict) -> str:
"""通过 MCP 读取飞书文档或 Wiki"""
token, doc_type = extract_doc_token(url)
if doc_type == "wiki":
result = call_mcp("get_wiki_node", {"token": token}, config)
elif doc_type in ("docx", "doc"):
result = call_mcp("get_doc_content", {"doc_token": token}, config)
elif doc_type == "sheet":
result = call_mcp("get_spreadsheet_content", {"spreadsheet_token": token}, config)
else:
raise ValueError(f"不支持的文档类型:{doc_type}")
# 提取 MCP 返回的内容
if "result" in result:
content = result["result"]
if isinstance(content, list):
# MCP tool result 格式
for item in content:
if isinstance(item, dict) and item.get("type") == "text":
return item.get("text", "")
elif isinstance(content, str):
return content
elif "error" in result:
raise RuntimeError(f"MCP 返回错误:{result['error']}")
return json.dumps(result, ensure_ascii=False, indent=2)
def fetch_messages_via_mcp(
chat_id: str,
target_name: str,
limit: int,
config: dict,
) -> str:
"""通过 MCP 读取群聊消息记录"""
result = call_mcp(
"get_chat_messages",
{
"chat_id": chat_id,
"page_size": min(limit, 50), # 飞书 API 单次最多 50 条
},
config,
)
messages = []
raw = result.get("result", [])
if isinstance(raw, list):
messages = raw
elif isinstance(raw, str):
try:
messages = json.loads(raw)
except Exception:
return raw
# 过滤目标人物
if target_name:
messages = [
m for m in messages
if target_name in str(m.get("sender", {}).get("name", ""))
]
# 分类输出
long_msgs = [m for m in messages if len(str(m.get("content", ""))) > 50]
short_msgs = [m for m in messages if len(str(m.get("content", ""))) <= 50]
lines = [
"# 飞书消息记录MCP 方案)",
f"群聊 ID{chat_id}",
f"目标人物:{target_name or '全部'}",
f"{len(messages)}",
"",
"---",
"",
"## 长消息",
"",
]
for m in long_msgs:
sender = m.get("sender", {}).get("name", "")
content = m.get("content", "")
ts = m.get("create_time", "")
lines.append(f"[{ts}] {sender}{content}")
lines.append("")
lines += ["---", "", "## 日常消息", ""]
for m in short_msgs[:200]:
sender = m.get("sender", {}).get("name", "")
content = m.get("content", "")
lines.append(f"{sender}{content}")
return "\n".join(lines)
def list_wiki_docs(space_id: str, config: dict) -> str:
"""列出知识库空间下的所有文档"""
result = call_mcp("list_wiki_nodes", {"space_id": space_id}, config)
raw = result.get("result", "")
if isinstance(raw, str):
return raw
return json.dumps(raw, ensure_ascii=False, indent=2)
# ─── CLI ─────────────────────────────────────────────────────────────────────
def main() -> None:
parser = argparse.ArgumentParser(description="飞书 MCP 客户端")
parser.add_argument("--setup", action="store_true", help="初始化配置App ID / Secret")
parser.add_argument("--url", help="飞书文档/Wiki/表格链接")
parser.add_argument("--chat-id", help="群聊 IDoc_xxx 格式)")
parser.add_argument("--target", help="目标人物姓名")
parser.add_argument("--limit", type=int, default=500, help="最多获取消息数")
parser.add_argument("--list-wiki", action="store_true", help="列出知识库文档")
parser.add_argument("--space-id", help="知识库 Space ID")
parser.add_argument("--output", default=None, help="输出文件路径")
args = parser.parse_args()
if args.setup:
setup_config()
return
config = load_config()
if not config:
print("错误尚未配置请先运行python3 feishu_mcp_client.py --setup", file=sys.stderr)
sys.exit(1)
content = ""
if args.url:
print(f"通过 MCP 读取:{args.url}", file=sys.stderr)
content = fetch_doc_via_mcp(args.url, config)
elif args.chat_id:
print(f"通过 MCP 读取消息:{args.chat_id}", file=sys.stderr)
content = fetch_messages_via_mcp(
args.chat_id,
args.target or "",
args.limit,
config,
)
elif args.list_wiki:
if not args.space_id:
print("错误:--list-wiki 需要 --space-id", file=sys.stderr)
sys.exit(1)
content = list_wiki_docs(args.space_id, config)
else:
parser.print_help()
return
if args.output:
Path(args.output).write_text(content, encoding="utf-8")
print(f"✅ 已保存到 {args.output}", file=sys.stderr)
else:
print(content)
if __name__ == "__main__":
main()

251
tools/feishu_parser.py Normal file
View File

@@ -0,0 +1,251 @@
#!/usr/bin/env python3
"""
飞书消息导出 JSON 解析器
支持的导出格式:
1. 飞书官方导出(群聊记录):通常为 JSON 数组,每条消息包含 sender、content、timestamp
2. 手动整理的 TXT 格式(每行:时间 发送人:内容)
用法:
python feishu_parser.py --file messages.json --target "张三" --output output.txt
python feishu_parser.py --file messages.txt --target "张三" --output output.txt
"""
import json
import re
import sys
import argparse
from pathlib import Path
from datetime import datetime
def parse_feishu_json(file_path: str, target_name: str) -> list[dict]:
"""解析飞书官方导出的 JSON 格式消息"""
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
messages = []
# 兼容多种 JSON 结构
if isinstance(data, list):
raw_messages = data
elif isinstance(data, dict):
# 可能在 data.messages 或 data.records 等字段下
raw_messages = (
data.get("messages")
or data.get("records")
or data.get("data")
or []
)
else:
return []
for msg in raw_messages:
sender = (
msg.get("sender_name")
or msg.get("sender")
or msg.get("from")
or msg.get("user_name")
or ""
)
content = (
msg.get("content")
or msg.get("text")
or msg.get("message")
or msg.get("body")
or ""
)
timestamp = (
msg.get("timestamp")
or msg.get("create_time")
or msg.get("time")
or ""
)
# content 可能是嵌套结构
if isinstance(content, dict):
content = content.get("text") or content.get("content") or str(content)
if isinstance(content, list):
content = " ".join(
c.get("text", "") if isinstance(c, dict) else str(c)
for c in content
)
# 过滤:只保留目标人发送的消息
if target_name and target_name not in str(sender):
continue
# 过滤:跳过系统消息、表情包、撤回消息
if not content or content.strip() in ["[图片]", "[文件]", "[撤回了一条消息]", "[语音]"]:
continue
messages.append({
"sender": str(sender),
"content": str(content).strip(),
"timestamp": str(timestamp),
})
return messages
def parse_feishu_txt(file_path: str, target_name: str) -> list[dict]:
"""解析手动整理的 TXT 格式消息(格式:时间 发送人:内容)"""
messages = []
with open(file_path, "r", encoding="utf-8") as f:
lines = f.readlines()
# 匹配格式2024-01-01 10:00 张三:消息内容
pattern = re.compile(
r"^(?P<time>\d{4}[-/]\d{1,2}[-/]\d{1,2}[\s\d:]*)\s+(?P<sender>.+?)[:]\s*(?P<content>.+)$"
)
for line in lines:
line = line.strip()
if not line:
continue
m = pattern.match(line)
if m:
sender = m.group("sender").strip()
content = m.group("content").strip()
timestamp = m.group("time").strip()
if target_name and target_name not in sender:
continue
if not content:
continue
messages.append({
"sender": sender,
"content": content,
"timestamp": timestamp,
})
else:
# 没有匹配格式,检查是否包含目标人名
if target_name and target_name in line:
messages.append({
"sender": target_name,
"content": line,
"timestamp": "",
})
return messages
def extract_key_content(messages: list[dict]) -> dict:
"""
对消息进行分类提取,区分:
- 长消息(>50字可能包含观点、方案、技术判断
- 决策类回复:包含"同意""不行""觉得""建议"等关键词
- 日常沟通:其他消息
"""
long_messages = []
decision_messages = []
daily_messages = []
decision_keywords = [
"同意", "不行", "觉得", "建议", "应该", "不应该", "可以", "不可以",
"方案", "思路", "考虑", "决定", "确认", "拒绝", "推进", "暂缓",
"没问题", "有问题", "风险", "评估", "判断"
]
for msg in messages:
content = msg["content"]
if len(content) > 50:
long_messages.append(msg)
elif any(kw in content for kw in decision_keywords):
decision_messages.append(msg)
else:
daily_messages.append(msg)
return {
"long_messages": long_messages,
"decision_messages": decision_messages,
"daily_messages": daily_messages,
"total_count": len(messages),
}
def format_output(target_name: str, extracted: dict) -> str:
"""格式化输出,供 AI 分析使用"""
lines = [
f"# 飞书消息提取结果",
f"目标人物:{target_name}",
f"总消息数:{extracted['total_count']}",
"",
"---",
"",
"## 长消息(观点/方案类,权重最高)",
"",
]
for msg in extracted["long_messages"]:
ts = f"[{msg['timestamp']}] " if msg["timestamp"] else ""
lines.append(f"{ts}{msg['content']}")
lines.append("")
lines += [
"---",
"",
"## 决策类回复",
"",
]
for msg in extracted["decision_messages"]:
ts = f"[{msg['timestamp']}] " if msg["timestamp"] else ""
lines.append(f"{ts}{msg['content']}")
lines.append("")
lines += [
"---",
"",
"## 日常沟通(风格参考)",
"",
]
# 日常消息只取前 100 条,避免太长
for msg in extracted["daily_messages"][:100]:
ts = f"[{msg['timestamp']}] " if msg["timestamp"] else ""
lines.append(f"{ts}{msg['content']}")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="解析飞书消息导出文件")
parser.add_argument("--file", required=True, help="输入文件路径(.json 或 .txt")
parser.add_argument("--target", required=True, help="目标人物姓名(只提取此人发出的消息)")
parser.add_argument("--output", default=None, help="输出文件路径(默认打印到 stdout")
args = parser.parse_args()
file_path = Path(args.file)
if not file_path.exists():
print(f"错误:文件不存在 {file_path}", file=sys.stderr)
sys.exit(1)
# 根据文件类型选择解析器
if file_path.suffix.lower() == ".json":
messages = parse_feishu_json(str(file_path), args.target)
else:
messages = parse_feishu_txt(str(file_path), args.target)
if not messages:
print(f"警告:未找到 '{args.target}' 发出的消息", file=sys.stderr)
print("提示:请检查目标姓名是否与文件中的发送人名称一致", file=sys.stderr)
extracted = extract_key_content(messages)
output = format_output(args.target, extracted)
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
f.write(output)
print(f"已输出到 {args.output},共 {len(messages)} 条消息")
else:
print(output)
if __name__ == "__main__":
main()

380
tools/skill_writer.py Normal file
View File

@@ -0,0 +1,380 @@
#!/usr/bin/env python3
"""
Skill 文件写入器
负责将生成的 work.md、persona.md 写入到正确的目录结构,
并生成 meta.json 和完整的 SKILL.md。
用法:
python3 skill_writer.py --action create --slug zhangsan --meta meta.json \
--work work_content.md --persona persona_content.md \
--base-dir ./colleagues
python3 skill_writer.py --action update --slug zhangsan \
--work-patch work_patch.md --persona-patch persona_patch.md \
--base-dir ./colleagues
python3 skill_writer.py --action list --base-dir ./colleagues
"""
from __future__ import annotations
import json
import shutil
import argparse
import sys
from pathlib import Path
from datetime import datetime, timezone
from typing import Optional
SKILL_MD_TEMPLATE = """\
---
name: colleague_{slug}
description: {name}{identity}
user-invocable: true
---
# {name}
{identity}
---
## PART A工作能力
{work_content}
---
## PART B人物性格
{persona_content}
---
## 运行规则
接收到任何任务或问题时:
1. **先由 PART B 判断**:你会不会接这个任务?用什么态度接?
2. **再由 PART A 执行**:用你的技术能力和工作方法完成任务
3. **输出时保持 PART B 的表达风格**:你说话的方式、用词习惯、句式
**PART B 的 Layer 0 规则永远优先,任何情况下不得违背。**
"""
def slugify(name: str) -> str:
"""
将姓名转为 slug。
优先尝试 pypinyin如已安装否则 fallback 到简单处理。
"""
# 尝试用 pypinyin 转拼音
try:
from pypinyin import lazy_pinyin
parts = lazy_pinyin(name)
slug = "_".join(parts)
except ImportError:
# fallback保留 ASCII 字母数字,中文直接去掉
import unicodedata
result = []
for char in name.lower():
cat = unicodedata.category(char)
if char.isascii() and (char.isalnum() or char in ("-", "_")):
result.append(char)
elif char == " ":
result.append("_")
# 中文字符跳过(无 pypinyin 时无法转换)
slug = "".join(result)
# 清理:去掉连续下划线,首尾下划线
import re
slug = re.sub(r"_+", "_", slug).strip("_")
return slug if slug else "colleague"
def build_identity_string(meta: dict) -> str:
"""从 meta 构建身份描述字符串"""
profile = meta.get("profile", {})
parts = []
company = profile.get("company", "")
level = profile.get("level", "")
role = profile.get("role", "")
if company:
parts.append(company)
if level:
parts.append(level)
if role:
parts.append(role)
identity = " ".join(parts) if parts else "同事"
mbti = profile.get("mbti", "")
if mbti:
identity += f"MBTI {mbti}"
return identity
def create_skill(
base_dir: Path,
slug: str,
meta: dict,
work_content: str,
persona_content: str,
) -> Path:
"""创建新的同事 Skill 目录结构"""
skill_dir = base_dir / slug
skill_dir.mkdir(parents=True, exist_ok=True)
# 创建子目录
(skill_dir / "versions").mkdir(exist_ok=True)
(skill_dir / "knowledge" / "docs").mkdir(parents=True, exist_ok=True)
(skill_dir / "knowledge" / "messages").mkdir(parents=True, exist_ok=True)
(skill_dir / "knowledge" / "emails").mkdir(parents=True, exist_ok=True)
# 写入 work.md
(skill_dir / "work.md").write_text(work_content, encoding="utf-8")
# 写入 persona.md
(skill_dir / "persona.md").write_text(persona_content, encoding="utf-8")
# 生成并写入 SKILL.md
name = meta.get("name", slug)
identity = build_identity_string(meta)
skill_md = SKILL_MD_TEMPLATE.format(
slug=slug,
name=name,
identity=identity,
work_content=work_content,
persona_content=persona_content,
)
(skill_dir / "SKILL.md").write_text(skill_md, encoding="utf-8")
# 写入 work-only skill
work_only = (
f"---\nname: colleague_{slug}_work\n"
f"description: {name} 的工作能力(仅 Work无 Persona\n"
f"user-invocable: true\n---\n\n{work_content}\n"
)
(skill_dir / "work_skill.md").write_text(work_only, encoding="utf-8")
# 写入 persona-only skill
persona_only = (
f"---\nname: colleague_{slug}_persona\n"
f"description: {name} 的人物性格(仅 Persona无工作能力\n"
f"user-invocable: true\n---\n\n{persona_content}\n"
)
(skill_dir / "persona_skill.md").write_text(persona_only, encoding="utf-8")
# 写入 meta.json
now = datetime.now(timezone.utc).isoformat()
meta["slug"] = slug
meta.setdefault("created_at", now)
meta["updated_at"] = now
meta["version"] = "v1"
meta.setdefault("corrections_count", 0)
(skill_dir / "meta.json").write_text(
json.dumps(meta, ensure_ascii=False, indent=2),
encoding="utf-8",
)
return skill_dir
def update_skill(
skill_dir: Path,
work_patch: Optional[str] = None,
persona_patch: Optional[str] = None,
correction: Optional[dict] = None,
) -> str:
"""更新现有 Skill先存档当前版本再写入更新"""
meta_path = skill_dir / "meta.json"
meta = json.loads(meta_path.read_text(encoding="utf-8"))
current_version = meta.get("version", "v1")
try:
version_num = int(current_version.lstrip("v").split("_")[0]) + 1
except ValueError:
version_num = 2
new_version = f"v{version_num}"
# 存档当前版本
version_dir = skill_dir / "versions" / current_version
version_dir.mkdir(parents=True, exist_ok=True)
for fname in ("SKILL.md", "work.md", "persona.md"):
src = skill_dir / fname
if src.exists():
shutil.copy2(src, version_dir / fname)
# 应用 work patch
if work_patch:
current_work = (skill_dir / "work.md").read_text(encoding="utf-8")
new_work = current_work + "\n\n" + work_patch
(skill_dir / "work.md").write_text(new_work, encoding="utf-8")
# 应用 persona patch 或 correction
if persona_patch or correction:
current_persona = (skill_dir / "persona.md").read_text(encoding="utf-8")
if correction:
correction_line = (
f"\n- [{correction.get('scene', '通用')}] "
f"不应该 {correction['wrong']},应该 {correction['correct']}"
)
target = "## Correction 记录"
if target in current_persona:
insert_pos = current_persona.index(target) + len(target)
# 跳过紧跟的空行和"暂无"占位行
rest = current_persona[insert_pos:]
skip = "\n\n(暂无记录)"
if rest.startswith(skip):
rest = rest[len(skip):]
new_persona = current_persona[:insert_pos] + correction_line + rest
else:
new_persona = (
current_persona
+ f"\n\n## Correction 记录\n{correction_line}\n"
)
meta["corrections_count"] = meta.get("corrections_count", 0) + 1
else:
new_persona = current_persona + "\n\n" + persona_patch
(skill_dir / "persona.md").write_text(new_persona, encoding="utf-8")
# 重新生成 SKILL.md
work_content = (skill_dir / "work.md").read_text(encoding="utf-8")
persona_content = (skill_dir / "persona.md").read_text(encoding="utf-8")
name = meta.get("name", skill_dir.name)
identity = build_identity_string(meta)
skill_md = SKILL_MD_TEMPLATE.format(
slug=skill_dir.name,
name=name,
identity=identity,
work_content=work_content,
persona_content=persona_content,
)
(skill_dir / "SKILL.md").write_text(skill_md, encoding="utf-8")
# 更新 meta
meta["version"] = new_version
meta["updated_at"] = datetime.now(timezone.utc).isoformat()
meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
return new_version
def list_colleagues(base_dir: Path) -> list:
"""列出所有已创建的同事 Skill"""
colleagues = []
if not base_dir.exists():
return colleagues
for skill_dir in sorted(base_dir.iterdir()):
if not skill_dir.is_dir():
continue
meta_path = skill_dir / "meta.json"
if not meta_path.exists():
continue
try:
meta = json.loads(meta_path.read_text(encoding="utf-8"))
except Exception:
continue
colleagues.append({
"slug": meta.get("slug", skill_dir.name),
"name": meta.get("name", skill_dir.name),
"identity": build_identity_string(meta),
"version": meta.get("version", "v1"),
"updated_at": meta.get("updated_at", ""),
"corrections_count": meta.get("corrections_count", 0),
})
return colleagues
def main() -> None:
parser = argparse.ArgumentParser(description="Skill 文件写入器")
parser.add_argument("--action", required=True, choices=["create", "update", "list"])
parser.add_argument("--slug", help="同事 slug用于目录名")
parser.add_argument("--name", help="同事姓名")
parser.add_argument("--meta", help="meta.json 文件路径")
parser.add_argument("--work", help="work.md 内容文件路径")
parser.add_argument("--persona", help="persona.md 内容文件路径")
parser.add_argument("--work-patch", help="work.md 增量更新内容文件路径")
parser.add_argument("--persona-patch", help="persona.md 增量更新内容文件路径")
parser.add_argument(
"--base-dir",
default="./colleagues",
help="同事 Skill 根目录(默认:./colleagues",
)
args = parser.parse_args()
base_dir = Path(args.base_dir).expanduser()
if args.action == "list":
colleagues = list_colleagues(base_dir)
if not colleagues:
print("暂无已创建的同事 Skill")
else:
print(f"已创建 {len(colleagues)} 个同事 Skill\n")
for c in colleagues:
updated = c["updated_at"][:10] if c["updated_at"] else "未知"
print(f" [{c['slug']}] {c['name']}{c['identity']}")
print(f" 版本: {c['version']} 纠正次数: {c['corrections_count']} 更新: {updated}")
print()
elif args.action == "create":
if not args.slug and not args.name:
print("错误create 操作需要 --slug 或 --name", file=sys.stderr)
sys.exit(1)
meta: dict = {}
if args.meta:
meta = json.loads(Path(args.meta).read_text(encoding="utf-8"))
if args.name:
meta["name"] = args.name
slug = args.slug or slugify(meta.get("name", "colleague"))
work_content = ""
if args.work:
work_content = Path(args.work).read_text(encoding="utf-8")
persona_content = ""
if args.persona:
persona_content = Path(args.persona).read_text(encoding="utf-8")
skill_dir = create_skill(base_dir, slug, meta, work_content, persona_content)
print(f"✅ Skill 已创建:{skill_dir}")
print(f" 触发词:/{slug}")
elif args.action == "update":
if not args.slug:
print("错误update 操作需要 --slug", file=sys.stderr)
sys.exit(1)
skill_dir = base_dir / args.slug
if not skill_dir.exists():
print(f"错误:找不到 Skill 目录 {skill_dir}", file=sys.stderr)
sys.exit(1)
work_patch = Path(args.work_patch).read_text(encoding="utf-8") if args.work_patch else None
persona_patch = Path(args.persona_patch).read_text(encoding="utf-8") if args.persona_patch else None
new_version = update_skill(skill_dir, work_patch, persona_patch)
print(f"✅ Skill 已更新到 {new_version}{skill_dir}")
if __name__ == "__main__":
main()

154
tools/version_manager.py Normal file
View File

@@ -0,0 +1,154 @@
#!/usr/bin/env python3
"""
版本管理器
负责 Skill 文件的版本存档和回滚。
用法:
python version_manager.py --action list --slug zhangsan --base-dir ~/.openclaw/...
python version_manager.py --action rollback --slug zhangsan --version v2 --base-dir ~/.openclaw/...
"""
from __future__ import annotations
import json
import shutil
import argparse
import sys
from pathlib import Path
from datetime import datetime, timezone
MAX_VERSIONS = 10 # 最多保留的版本数
def list_versions(skill_dir: Path) -> list:
"""列出所有历史版本"""
versions_dir = skill_dir / "versions"
if not versions_dir.exists():
return []
versions = []
for v_dir in sorted(versions_dir.iterdir()):
if not v_dir.is_dir():
continue
# 从目录名解析版本号
version_name = v_dir.name
# 获取存档时间(用目录修改时间近似)
mtime = v_dir.stat().st_mtime
archived_at = datetime.fromtimestamp(mtime, tz=timezone.utc).strftime("%Y-%m-%d %H:%M")
# 统计文件
files = [f.name for f in v_dir.iterdir() if f.is_file()]
versions.append({
"version": version_name,
"archived_at": archived_at,
"files": files,
"path": str(v_dir),
})
return versions
def rollback(skill_dir: Path, target_version: str) -> bool:
"""回滚到指定版本"""
version_dir = skill_dir / "versions" / target_version
if not version_dir.exists():
print(f"错误:版本 {target_version} 不存在", file=sys.stderr)
return False
# 先存档当前版本
meta_path = skill_dir / "meta.json"
if meta_path.exists():
meta = json.loads(meta_path.read_text(encoding="utf-8"))
current_version = meta.get("version", "v?")
backup_dir = skill_dir / "versions" / f"{current_version}_before_rollback"
backup_dir.mkdir(parents=True, exist_ok=True)
for fname in ("SKILL.md", "work.md", "persona.md"):
src = skill_dir / fname
if src.exists():
shutil.copy2(src, backup_dir / fname)
# 从目标版本恢复文件
restored_files = []
for fname in ("SKILL.md", "work.md", "persona.md"):
src = version_dir / fname
if src.exists():
shutil.copy2(src, skill_dir / fname)
restored_files.append(fname)
# 更新 meta
if meta_path.exists():
meta = json.loads(meta_path.read_text(encoding="utf-8"))
meta["version"] = target_version + "_restored"
meta["updated_at"] = datetime.now(timezone.utc).isoformat()
meta["rollback_from"] = current_version
meta_path.write_text(json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"已回滚到 {target_version},恢复文件:{', '.join(restored_files)}")
return True
def cleanup_old_versions(skill_dir: Path, max_versions: int = MAX_VERSIONS):
"""清理超出限制的旧版本"""
versions_dir = skill_dir / "versions"
if not versions_dir.exists():
return
# 按版本号排序,保留最新的 max_versions 个
version_dirs = sorted(
[d for d in versions_dir.iterdir() if d.is_dir()],
key=lambda d: d.stat().st_mtime,
)
to_delete = version_dirs[:-max_versions] if len(version_dirs) > max_versions else []
for old_dir in to_delete:
shutil.rmtree(old_dir)
print(f"已清理旧版本:{old_dir.name}")
def main():
parser = argparse.ArgumentParser(description="Skill 版本管理器")
parser.add_argument("--action", required=True, choices=["list", "rollback", "cleanup"])
parser.add_argument("--slug", required=True, help="同事 slug")
parser.add_argument("--version", help="目标版本号rollback 时使用)")
parser.add_argument(
"--base-dir",
default="~/.openclaw/workspace/skills/colleagues",
help="同事 Skill 根目录",
)
args = parser.parse_args()
base_dir = Path(args.base_dir).expanduser()
skill_dir = base_dir / args.slug
if not skill_dir.exists():
print(f"错误:找不到 Skill 目录 {skill_dir}", file=sys.stderr)
sys.exit(1)
if args.action == "list":
versions = list_versions(skill_dir)
if not versions:
print(f"{args.slug} 暂无历史版本")
else:
print(f"{args.slug} 的历史版本:\n")
for v in versions:
print(f" {v['version']} 存档时间: {v['archived_at']} 文件: {', '.join(v['files'])}")
elif args.action == "rollback":
if not args.version:
print("错误rollback 操作需要 --version", file=sys.stderr)
sys.exit(1)
rollback(skill_dir, args.version)
elif args.action == "cleanup":
cleanup_old_versions(skill_dir)
print("清理完成")
if __name__ == "__main__":
main()