Files
colleague-skill/tools/email_parser.py
titanwings 6a0b31aa6c refactor: restructure to official AgentSkills/Claude Code skill format
- Flatten colleague-creator/ to repo root (repo = skill directory)
- Update SKILL.md frontmatter with official fields: name, description,
  argument-hint, version, user-invocable, allowed-tools
- Move PRD.md → docs/PRD.md
- Add .gitignore, requirements.txt, LICENSE
- Update README and INSTALL docs to reflect new structure and git clone install

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-30 13:37:54 +08:00

340 lines
9.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
邮件解析器
支持格式:
1. .eml 文件(标准邮件格式)
2. .txt 文件(纯文本邮件记录)
3. .mbox 文件(多封邮件合集)
用法:
python email_parser.py --file emails.eml --target "zhangsan@company.com" --output output.txt
python email_parser.py --file inbox.mbox --target "张三" --output output.txt
"""
import email
import email.policy
import mailbox
import re
import sys
import argparse
from pathlib import Path
from email.header import decode_header
from html.parser import HTMLParser
class HTMLTextExtractor(HTMLParser):
"""从 HTML 邮件内容中提取纯文本"""
def __init__(self):
super().__init__()
self.result = []
self._skip = False
def handle_starttag(self, tag, attrs):
if tag in ("script", "style"):
self._skip = True
def handle_endtag(self, tag):
if tag in ("script", "style"):
self._skip = False
if tag in ("p", "br", "div", "tr"):
self.result.append("\n")
def handle_data(self, data):
if not self._skip:
self.result.append(data)
def get_text(self):
return re.sub(r"\n{3,}", "\n\n", "".join(self.result)).strip()
def decode_mime_str(s: str) -> str:
"""解码 MIME 编码的邮件头字段"""
if not s:
return ""
parts = decode_header(s)
result = []
for part, charset in parts:
if isinstance(part, bytes):
charset = charset or "utf-8"
try:
result.append(part.decode(charset, errors="replace"))
except Exception:
result.append(part.decode("utf-8", errors="replace"))
else:
result.append(str(part))
return "".join(result)
def extract_email_body(msg) -> str:
"""从邮件对象中提取正文文本"""
body = ""
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
disposition = str(part.get("Content-Disposition", ""))
if "attachment" in disposition:
continue
if content_type == "text/plain":
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
try:
body = payload.decode(charset, errors="replace")
break
except Exception:
body = payload.decode("utf-8", errors="replace")
break
elif content_type == "text/html" and not body:
payload = part.get_payload(decode=True)
charset = part.get_content_charset() or "utf-8"
try:
html = payload.decode(charset, errors="replace")
except Exception:
html = payload.decode("utf-8", errors="replace")
extractor = HTMLTextExtractor()
extractor.feed(html)
body = extractor.get_text()
else:
payload = msg.get_payload(decode=True)
if payload:
charset = msg.get_content_charset() or "utf-8"
try:
body = payload.decode(charset, errors="replace")
except Exception:
body = payload.decode("utf-8", errors="replace")
# 清理引用内容Re: 时的原文引用)
body = re.sub(r"\n>.*", "", body)
body = re.sub(r"\n-{3,}.*?原始邮件.*?\n", "\n", body, flags=re.DOTALL)
body = re.sub(r"\n_{3,}\n.*", "", body, flags=re.DOTALL)
return body.strip()
def is_from_target(from_field: str, target: str) -> bool:
"""判断邮件是否来自目标人"""
from_str = decode_mime_str(from_field).lower()
target_lower = target.lower()
return target_lower in from_str
def parse_eml_file(file_path: str, target: str) -> list[dict]:
"""解析单个 .eml 文件"""
with open(file_path, "rb") as f:
msg = email.message_from_binary_file(f, policy=email.policy.default)
from_field = str(msg.get("From", ""))
if not is_from_target(from_field, target):
return []
subject = decode_mime_str(str(msg.get("Subject", "")))
date = str(msg.get("Date", ""))
body = extract_email_body(msg)
if not body:
return []
return [{
"from": decode_mime_str(from_field),
"subject": subject,
"date": date,
"body": body,
}]
def parse_mbox_file(file_path: str, target: str) -> list[dict]:
"""解析 .mbox 文件(多封邮件合集)"""
results = []
mbox = mailbox.mbox(file_path)
for msg in mbox:
from_field = str(msg.get("From", ""))
if not is_from_target(from_field, target):
continue
subject = decode_mime_str(str(msg.get("Subject", "")))
date = str(msg.get("Date", ""))
body = extract_email_body(msg)
if not body:
continue
results.append({
"from": decode_mime_str(from_field),
"subject": subject,
"date": date,
"body": body,
})
return results
def parse_txt_file(file_path: str, target: str) -> list[dict]:
"""
解析纯文本格式的邮件记录
支持简单的分隔格式:
From: xxx
Subject: xxx
Date: xxx
---
正文内容
===
"""
results = []
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
# 尝试按分隔符切割多封邮件
emails_raw = re.split(r"\n={3,}\n|\n-{3,}\n(?=From:)", content)
for raw in emails_raw:
from_match = re.search(r"^From:\s*(.+)$", raw, re.MULTILINE)
subject_match = re.search(r"^Subject:\s*(.+)$", raw, re.MULTILINE)
date_match = re.search(r"^Date:\s*(.+)$", raw, re.MULTILINE)
from_field = from_match.group(1).strip() if from_match else ""
if not is_from_target(from_field, target):
continue
# 提取正文(去掉头部字段后的内容)
body = re.sub(r"^(From|To|Subject|Date|CC|BCC):.*\n?", "", raw, flags=re.MULTILINE)
body = body.strip()
if not body:
continue
results.append({
"from": from_field,
"subject": subject_match.group(1).strip() if subject_match else "",
"date": date_match.group(1).strip() if date_match else "",
"body": body,
})
return results
def classify_emails(emails: list[dict]) -> dict:
"""
对邮件按内容分类:
- 长邮件(正文 > 200 字):技术方案、观点陈述
- 决策类:包含明确判断的邮件
- 日常沟通:短邮件
"""
long_emails = []
decision_emails = []
daily_emails = []
decision_keywords = [
"同意", "不同意", "建议", "方案", "觉得", "应该", "决定", "确认",
"approve", "reject", "lgtm", "suggest", "recommend", "think",
"我的看法", "我认为", "我觉得", "需要", "必须", "不需要"
]
for e in emails:
body = e["body"]
if len(body) > 200:
long_emails.append(e)
elif any(kw in body.lower() for kw in decision_keywords):
decision_emails.append(e)
else:
daily_emails.append(e)
return {
"long_emails": long_emails,
"decision_emails": decision_emails,
"daily_emails": daily_emails,
"total_count": len(emails),
}
def format_output(target: str, classified: dict) -> str:
"""格式化输出,供 AI 分析使用"""
lines = [
f"# 邮件提取结果",
f"目标人物:{target}",
f"总邮件数:{classified['total_count']}",
"",
"---",
"",
"## 长邮件(技术方案/观点类,权重最高)",
"",
]
for e in classified["long_emails"]:
lines.append(f"**主题:{e['subject']}** [{e['date']}]")
lines.append(e["body"])
lines.append("")
lines.append("---")
lines.append("")
lines += [
"## 决策类邮件",
"",
]
for e in classified["decision_emails"]:
lines.append(f"**主题:{e['subject']}** [{e['date']}]")
lines.append(e["body"])
lines.append("")
lines += [
"---",
"",
"## 日常沟通(风格参考)",
"",
]
for e in classified["daily_emails"][:30]:
lines.append(f"**{e['subject']}**{e['body'][:200]}")
lines.append("")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="解析邮件文件,提取目标人发出的邮件")
parser.add_argument("--file", required=True, help="输入文件路径(.eml / .mbox / .txt")
parser.add_argument("--target", required=True, help="目标人物(邮箱地址或姓名)")
parser.add_argument("--output", default=None, help="输出文件路径(默认打印到 stdout")
args = parser.parse_args()
file_path = Path(args.file)
if not file_path.exists():
print(f"错误:文件不存在 {file_path}", file=sys.stderr)
sys.exit(1)
suffix = file_path.suffix.lower()
if suffix == ".eml":
emails = parse_eml_file(str(file_path), args.target)
elif suffix == ".mbox":
emails = parse_mbox_file(str(file_path), args.target)
else:
emails = parse_txt_file(str(file_path), args.target)
if not emails:
print(f"警告:未找到来自 '{args.target}' 的邮件", file=sys.stderr)
print("提示:请检查目标名称/邮箱是否与文件中的 From 字段一致", file=sys.stderr)
classified = classify_emails(emails)
output = format_output(args.target, classified)
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
f.write(output)
print(f"已输出到 {args.output},共 {len(emails)} 封邮件")
else:
print(output)
if __name__ == "__main__":
main()