mirror of
https://github.com/titanwings/colleague-skill.git
synced 2026-04-04 22:59:06 +08:00
功能: - 通过飞书/钉钉自动采集同事的消息、文档、多维表格 - 支持 PDF、邮件、截图等手动上传 - 分析生成 Work Skill(工作能力)和 Persona(人物性格)两部分 - 支持对话纠正和版本管理 - 兼容 OpenClaw 和 Claude Code 双平台 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
340 lines
9.8 KiB
Python
340 lines
9.8 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
邮件解析器
|
||
|
||
支持格式:
|
||
1. .eml 文件(标准邮件格式)
|
||
2. .txt 文件(纯文本邮件记录)
|
||
3. .mbox 文件(多封邮件合集)
|
||
|
||
用法:
|
||
python email_parser.py --file emails.eml --target "zhangsan@company.com" --output output.txt
|
||
python email_parser.py --file inbox.mbox --target "张三" --output output.txt
|
||
"""
|
||
|
||
import email
|
||
import email.policy
|
||
import mailbox
|
||
import re
|
||
import sys
|
||
import argparse
|
||
from pathlib import Path
|
||
from email.header import decode_header
|
||
from html.parser import HTMLParser
|
||
|
||
|
||
class HTMLTextExtractor(HTMLParser):
|
||
"""从 HTML 邮件内容中提取纯文本"""
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.result = []
|
||
self._skip = False
|
||
|
||
def handle_starttag(self, tag, attrs):
|
||
if tag in ("script", "style"):
|
||
self._skip = True
|
||
|
||
def handle_endtag(self, tag):
|
||
if tag in ("script", "style"):
|
||
self._skip = False
|
||
if tag in ("p", "br", "div", "tr"):
|
||
self.result.append("\n")
|
||
|
||
def handle_data(self, data):
|
||
if not self._skip:
|
||
self.result.append(data)
|
||
|
||
def get_text(self):
|
||
return re.sub(r"\n{3,}", "\n\n", "".join(self.result)).strip()
|
||
|
||
|
||
def decode_mime_str(s: str) -> str:
|
||
"""解码 MIME 编码的邮件头字段"""
|
||
if not s:
|
||
return ""
|
||
parts = decode_header(s)
|
||
result = []
|
||
for part, charset in parts:
|
||
if isinstance(part, bytes):
|
||
charset = charset or "utf-8"
|
||
try:
|
||
result.append(part.decode(charset, errors="replace"))
|
||
except Exception:
|
||
result.append(part.decode("utf-8", errors="replace"))
|
||
else:
|
||
result.append(str(part))
|
||
return "".join(result)
|
||
|
||
|
||
def extract_email_body(msg) -> str:
|
||
"""从邮件对象中提取正文文本"""
|
||
body = ""
|
||
|
||
if msg.is_multipart():
|
||
for part in msg.walk():
|
||
content_type = part.get_content_type()
|
||
disposition = str(part.get("Content-Disposition", ""))
|
||
|
||
if "attachment" in disposition:
|
||
continue
|
||
|
||
if content_type == "text/plain":
|
||
payload = part.get_payload(decode=True)
|
||
charset = part.get_content_charset() or "utf-8"
|
||
try:
|
||
body = payload.decode(charset, errors="replace")
|
||
break
|
||
except Exception:
|
||
body = payload.decode("utf-8", errors="replace")
|
||
break
|
||
|
||
elif content_type == "text/html" and not body:
|
||
payload = part.get_payload(decode=True)
|
||
charset = part.get_content_charset() or "utf-8"
|
||
try:
|
||
html = payload.decode(charset, errors="replace")
|
||
except Exception:
|
||
html = payload.decode("utf-8", errors="replace")
|
||
extractor = HTMLTextExtractor()
|
||
extractor.feed(html)
|
||
body = extractor.get_text()
|
||
else:
|
||
payload = msg.get_payload(decode=True)
|
||
if payload:
|
||
charset = msg.get_content_charset() or "utf-8"
|
||
try:
|
||
body = payload.decode(charset, errors="replace")
|
||
except Exception:
|
||
body = payload.decode("utf-8", errors="replace")
|
||
|
||
# 清理引用内容(Re: 时的原文引用)
|
||
body = re.sub(r"\n>.*", "", body)
|
||
body = re.sub(r"\n-{3,}.*?原始邮件.*?\n", "\n", body, flags=re.DOTALL)
|
||
body = re.sub(r"\n_{3,}\n.*", "", body, flags=re.DOTALL)
|
||
|
||
return body.strip()
|
||
|
||
|
||
def is_from_target(from_field: str, target: str) -> bool:
|
||
"""判断邮件是否来自目标人"""
|
||
from_str = decode_mime_str(from_field).lower()
|
||
target_lower = target.lower()
|
||
return target_lower in from_str
|
||
|
||
|
||
def parse_eml_file(file_path: str, target: str) -> list[dict]:
|
||
"""解析单个 .eml 文件"""
|
||
with open(file_path, "rb") as f:
|
||
msg = email.message_from_binary_file(f, policy=email.policy.default)
|
||
|
||
from_field = str(msg.get("From", ""))
|
||
if not is_from_target(from_field, target):
|
||
return []
|
||
|
||
subject = decode_mime_str(str(msg.get("Subject", "")))
|
||
date = str(msg.get("Date", ""))
|
||
body = extract_email_body(msg)
|
||
|
||
if not body:
|
||
return []
|
||
|
||
return [{
|
||
"from": decode_mime_str(from_field),
|
||
"subject": subject,
|
||
"date": date,
|
||
"body": body,
|
||
}]
|
||
|
||
|
||
def parse_mbox_file(file_path: str, target: str) -> list[dict]:
|
||
"""解析 .mbox 文件(多封邮件合集)"""
|
||
results = []
|
||
mbox = mailbox.mbox(file_path)
|
||
|
||
for msg in mbox:
|
||
from_field = str(msg.get("From", ""))
|
||
if not is_from_target(from_field, target):
|
||
continue
|
||
|
||
subject = decode_mime_str(str(msg.get("Subject", "")))
|
||
date = str(msg.get("Date", ""))
|
||
body = extract_email_body(msg)
|
||
|
||
if not body:
|
||
continue
|
||
|
||
results.append({
|
||
"from": decode_mime_str(from_field),
|
||
"subject": subject,
|
||
"date": date,
|
||
"body": body,
|
||
})
|
||
|
||
return results
|
||
|
||
|
||
def parse_txt_file(file_path: str, target: str) -> list[dict]:
|
||
"""
|
||
解析纯文本格式的邮件记录
|
||
支持简单的分隔格式:
|
||
From: xxx
|
||
Subject: xxx
|
||
Date: xxx
|
||
---
|
||
正文内容
|
||
===
|
||
"""
|
||
results = []
|
||
|
||
with open(file_path, "r", encoding="utf-8") as f:
|
||
content = f.read()
|
||
|
||
# 尝试按分隔符切割多封邮件
|
||
emails_raw = re.split(r"\n={3,}\n|\n-{3,}\n(?=From:)", content)
|
||
|
||
for raw in emails_raw:
|
||
from_match = re.search(r"^From:\s*(.+)$", raw, re.MULTILINE)
|
||
subject_match = re.search(r"^Subject:\s*(.+)$", raw, re.MULTILINE)
|
||
date_match = re.search(r"^Date:\s*(.+)$", raw, re.MULTILINE)
|
||
|
||
from_field = from_match.group(1).strip() if from_match else ""
|
||
if not is_from_target(from_field, target):
|
||
continue
|
||
|
||
# 提取正文(去掉头部字段后的内容)
|
||
body = re.sub(r"^(From|To|Subject|Date|CC|BCC):.*\n?", "", raw, flags=re.MULTILINE)
|
||
body = body.strip()
|
||
|
||
if not body:
|
||
continue
|
||
|
||
results.append({
|
||
"from": from_field,
|
||
"subject": subject_match.group(1).strip() if subject_match else "",
|
||
"date": date_match.group(1).strip() if date_match else "",
|
||
"body": body,
|
||
})
|
||
|
||
return results
|
||
|
||
|
||
def classify_emails(emails: list[dict]) -> dict:
|
||
"""
|
||
对邮件按内容分类:
|
||
- 长邮件(正文 > 200 字):技术方案、观点陈述
|
||
- 决策类:包含明确判断的邮件
|
||
- 日常沟通:短邮件
|
||
"""
|
||
long_emails = []
|
||
decision_emails = []
|
||
daily_emails = []
|
||
|
||
decision_keywords = [
|
||
"同意", "不同意", "建议", "方案", "觉得", "应该", "决定", "确认",
|
||
"approve", "reject", "lgtm", "suggest", "recommend", "think",
|
||
"我的看法", "我认为", "我觉得", "需要", "必须", "不需要"
|
||
]
|
||
|
||
for e in emails:
|
||
body = e["body"]
|
||
|
||
if len(body) > 200:
|
||
long_emails.append(e)
|
||
elif any(kw in body.lower() for kw in decision_keywords):
|
||
decision_emails.append(e)
|
||
else:
|
||
daily_emails.append(e)
|
||
|
||
return {
|
||
"long_emails": long_emails,
|
||
"decision_emails": decision_emails,
|
||
"daily_emails": daily_emails,
|
||
"total_count": len(emails),
|
||
}
|
||
|
||
|
||
def format_output(target: str, classified: dict) -> str:
|
||
"""格式化输出,供 AI 分析使用"""
|
||
lines = [
|
||
f"# 邮件提取结果",
|
||
f"目标人物:{target}",
|
||
f"总邮件数:{classified['total_count']}",
|
||
"",
|
||
"---",
|
||
"",
|
||
"## 长邮件(技术方案/观点类,权重最高)",
|
||
"",
|
||
]
|
||
|
||
for e in classified["long_emails"]:
|
||
lines.append(f"**主题:{e['subject']}** [{e['date']}]")
|
||
lines.append(e["body"])
|
||
lines.append("")
|
||
lines.append("---")
|
||
lines.append("")
|
||
|
||
lines += [
|
||
"## 决策类邮件",
|
||
"",
|
||
]
|
||
|
||
for e in classified["decision_emails"]:
|
||
lines.append(f"**主题:{e['subject']}** [{e['date']}]")
|
||
lines.append(e["body"])
|
||
lines.append("")
|
||
|
||
lines += [
|
||
"---",
|
||
"",
|
||
"## 日常沟通(风格参考)",
|
||
"",
|
||
]
|
||
|
||
for e in classified["daily_emails"][:30]:
|
||
lines.append(f"**{e['subject']}**:{e['body'][:200]}")
|
||
lines.append("")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="解析邮件文件,提取目标人发出的邮件")
|
||
parser.add_argument("--file", required=True, help="输入文件路径(.eml / .mbox / .txt)")
|
||
parser.add_argument("--target", required=True, help="目标人物(邮箱地址或姓名)")
|
||
parser.add_argument("--output", default=None, help="输出文件路径(默认打印到 stdout)")
|
||
|
||
args = parser.parse_args()
|
||
|
||
file_path = Path(args.file)
|
||
if not file_path.exists():
|
||
print(f"错误:文件不存在 {file_path}", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
suffix = file_path.suffix.lower()
|
||
|
||
if suffix == ".eml":
|
||
emails = parse_eml_file(str(file_path), args.target)
|
||
elif suffix == ".mbox":
|
||
emails = parse_mbox_file(str(file_path), args.target)
|
||
else:
|
||
emails = parse_txt_file(str(file_path), args.target)
|
||
|
||
if not emails:
|
||
print(f"警告:未找到来自 '{args.target}' 的邮件", file=sys.stderr)
|
||
print("提示:请检查目标名称/邮箱是否与文件中的 From 字段一致", file=sys.stderr)
|
||
|
||
classified = classify_emails(emails)
|
||
output = format_output(args.target, classified)
|
||
|
||
if args.output:
|
||
with open(args.output, "w", encoding="utf-8") as f:
|
||
f.write(output)
|
||
print(f"已输出到 {args.output},共 {len(emails)} 封邮件")
|
||
else:
|
||
print(output)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|