fix: replace broken /search/v1/user with department traversal + batch_get_id

The old find_user() used /search/v1/user which requires user_access_token,
but the code only has tenant_access_token, causing error 99991663 every time.

New strategy:
1. Email/phone → /contact/v3/users/batch_get_id (fastest, proven working)
2. Name → department traversal + find_by_department (needs contact:department.base:readonly)
3. Both fail → clear error message with suggestions

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
titanwings
2026-04-01 19:10:48 +08:00
parent 3e9cc07b62
commit ce30311a72

View File

@@ -63,7 +63,8 @@ def setup_config() -> None:
print(" im:chat.members:readonly 读取群成员")
print()
print(" 用户类:")
print(" contact:user.base:readonly 搜索用户")
print(" contact:user.base:readonly 读取用户基本信息")
print(" contact:department.base:readonly 遍历部门查找用户(按姓名搜索必需)")
print()
print(" 文档类:")
print(" docs:doc:readonly 读取文档")
@@ -133,36 +134,134 @@ def api_post(path: str, body: dict, config: dict) -> dict:
# ─── 用户搜索 ─────────────────────────────────────────────────────────────────
def find_user(name: str, config: dict) -> Optional[dict]:
"""通过姓名搜索飞书用户"""
print(f" 搜索用户:{name} ...", file=sys.stderr)
def _find_user_by_contact(name: str, config: dict) -> Optional[dict]:
"""通过邮箱或手机号查找用户(使用 tenant_access_token"""
# 判断输入类型
emails, mobiles = [], []
if "@" in name:
emails = [name]
elif name.replace("+", "").replace("-", "").isdigit():
mobiles = [name]
else:
return None # 不是邮箱或手机号,跳过
data = api_get(
"/search/v1/user",
{"query": name, "page_size": 10},
config,
)
body = {}
if emails:
body["emails"] = emails
if mobiles:
body["mobiles"] = mobiles
data = api_post("/contact/v3/users/batch_get_id", body, config)
if data.get("code") != 0:
print(f" 搜索用户失败code={data.get('code')}{data.get('msg')}", file=sys.stderr)
print(f" 邮箱/手机号查找失败code={data.get('code')}{data.get('msg')}", file=sys.stderr)
return None
users = data.get("data", {}).get("results", [])
user_list = data.get("data", {}).get("user_list", [])
for item in user_list:
user_id = item.get("user_id")
if user_id:
# 获取用户详情
detail = api_get(f"/contact/v3/users/{user_id}", {"user_id_type": "user_id"}, config)
if detail.get("code") == 0:
user_data = detail.get("data", {}).get("user", {})
print(f" 找到用户:{user_data.get('name', user_id)}", file=sys.stderr)
return user_data
# 如果详情拉不到,返回基本信息
return {"user_id": user_id, "open_id": item.get("open_id", ""), "name": name}
return None
def _find_user_by_department(name: str, config: dict) -> Optional[dict]:
"""遍历部门查找用户(使用 tenant_access_token需要 contact:department.base:readonly"""
print(f" 通过部门遍历查找 {name} ...", file=sys.stderr)
# 递归获取所有部门 ID
dept_ids = ["0"] # 0 = 根部门
queue = ["0"]
while queue:
parent_id = queue.pop(0)
data = api_get(
f"/contact/v3/departments/{parent_id}/children",
{"page_size": 50, "fetch_child": False},
config,
)
if data.get("code") != 0:
if parent_id == "0":
print(f" 部门遍历失败code={data.get('code')}{data.get('msg')}", file=sys.stderr)
print(f" 请确认已开通 contact:department.base:readonly 权限", file=sys.stderr)
return None
continue
children = data.get("data", {}).get("items", [])
for child in children:
child_id = child.get("department_id", "")
if child_id:
dept_ids.append(child_id)
queue.append(child_id)
print(f"{len(dept_ids)} 个部门,搜索用户 ...", file=sys.stderr)
# 在每个部门中查找用户
matches = []
for dept_id in dept_ids:
page_token = None
while True:
params = {"department_id": dept_id, "page_size": 50}
if page_token:
params["page_token"] = page_token
data = api_get("/contact/v3/users/find_by_department", params, config)
if data.get("code") != 0:
break
users = data.get("data", {}).get("items", [])
for u in users:
uname = u.get("name", "")
en_name = u.get("en_name", "")
if name in uname or name in en_name or uname == name or en_name == name:
matches.append(u)
if not data.get("data", {}).get("has_more"):
break
page_token = data.get("data", {}).get("page_token")
if len(matches) >= 10:
break # 够了
return _select_user(matches, name)
def _select_user(users: list, name: str) -> Optional[dict]:
"""从候选列表中选择用户"""
if not users:
print(f" 未找到用户:{name}", file=sys.stderr)
return None
# 去重(按 user_id
seen = set()
deduped = []
for u in users:
uid = u.get("user_id", u.get("open_id", id(u)))
if uid not in seen:
seen.add(uid)
deduped.append(u)
users = deduped
if len(users) == 1:
u = users[0]
print(f" 找到用户:{u.get('name')}{u.get('department_path', [''])[0]}", file=sys.stderr)
dept_ids = u.get("department_ids", [])
print(f" 找到用户:{u.get('name')}(部门:{dept_ids[0] if dept_ids else ''}", file=sys.stderr)
return u
# 多个结果,让用户选择
print(f"\n 找到 {len(users)} 个结果,请选择:")
for i, u in enumerate(users):
dept = u.get("department_path", [""])
dept_str = dept[0] if dept else ""
print(f" [{i+1}] {u.get('name')} {dept_str} {u.get('user_id', '')}")
dept_ids = u.get("department_ids", [])
dept_str = dept_ids[0] if dept_ids else ""
en = u.get("en_name", "")
label = f"{u.get('name', '')} ({en})" if en else u.get("name", "")
print(f" [{i+1}] {label} dept={dept_str} uid={u.get('user_id', '')}")
choice = input("\n 选择编号(默认 1").strip() or "1"
try:
@@ -172,6 +271,35 @@ def find_user(name: str, config: dict) -> Optional[dict]:
return users[0]
def find_user(name: str, config: dict) -> Optional[dict]:
"""搜索飞书用户
策略:
1. 如果输入是邮箱/手机号 → 直接用 batch_get_id最快
2. 否则 → 遍历部门查找(需要 contact:department.base:readonly
3. 如果部门遍历也失败 → 提示用户改用邮箱/手机号
"""
print(f" 搜索用户:{name} ...", file=sys.stderr)
# 方法 1邮箱/手机号直接查找
user = _find_user_by_contact(name, config)
if user:
return user
# 方法 2部门遍历
user = _find_user_by_department(name, config)
if user:
return user
# 都失败
print(f"\n ❌ 未能找到用户 {name}", file=sys.stderr)
print(f" 建议:", file=sys.stderr)
print(f" 1. 确认已开通 contact:department.base:readonly 权限", file=sys.stderr)
print(f" 2. 改用邮箱搜索:--name user@company.com", file=sys.stderr)
print(f" 3. 改用手机号搜索:--name +8613800138000", file=sys.stderr)
return None
# ─── 消息记录 ─────────────────────────────────────────────────────────────────
def get_chats_with_user(user_open_id: str, config: dict) -> list: