diff --git a/tools/feishu_auto_collector.py b/tools/feishu_auto_collector.py index 6fe9adc..89cc250 100644 --- a/tools/feishu_auto_collector.py +++ b/tools/feishu_auto_collector.py @@ -63,7 +63,8 @@ def setup_config() -> None: print(" im:chat.members:readonly 读取群成员") print() print(" 用户类:") - print(" contact:user.base:readonly 搜索用户") + print(" contact:user.base:readonly 读取用户基本信息") + print(" contact:department.base:readonly 遍历部门查找用户(按姓名搜索必需)") print() print(" 文档类:") print(" docs:doc:readonly 读取文档") @@ -133,36 +134,134 @@ def api_post(path: str, body: dict, config: dict) -> dict: # ─── 用户搜索 ───────────────────────────────────────────────────────────────── -def find_user(name: str, config: dict) -> Optional[dict]: - """通过姓名搜索飞书用户""" - print(f" 搜索用户:{name} ...", file=sys.stderr) +def _find_user_by_contact(name: str, config: dict) -> Optional[dict]: + """通过邮箱或手机号查找用户(使用 tenant_access_token)""" + # 判断输入类型 + emails, mobiles = [], [] + if "@" in name: + emails = [name] + elif name.replace("+", "").replace("-", "").isdigit(): + mobiles = [name] + else: + return None # 不是邮箱或手机号,跳过 - data = api_get( - "/search/v1/user", - {"query": name, "page_size": 10}, - config, - ) + body = {} + if emails: + body["emails"] = emails + if mobiles: + body["mobiles"] = mobiles + data = api_post("/contact/v3/users/batch_get_id", body, config) if data.get("code") != 0: - print(f" 搜索用户失败(code={data.get('code')}):{data.get('msg')}", file=sys.stderr) + print(f" 邮箱/手机号查找失败(code={data.get('code')}):{data.get('msg')}", file=sys.stderr) return None - users = data.get("data", {}).get("results", []) + user_list = data.get("data", {}).get("user_list", []) + for item in user_list: + user_id = item.get("user_id") + if user_id: + # 获取用户详情 + detail = api_get(f"/contact/v3/users/{user_id}", {"user_id_type": "user_id"}, config) + if detail.get("code") == 0: + user_data = detail.get("data", {}).get("user", {}) + print(f" 找到用户:{user_data.get('name', user_id)}", file=sys.stderr) + return user_data + # 如果详情拉不到,返回基本信息 + return {"user_id": user_id, "open_id": item.get("open_id", ""), "name": name} + + return None + + +def _find_user_by_department(name: str, config: dict) -> Optional[dict]: + """遍历部门查找用户(使用 tenant_access_token,需要 contact:department.base:readonly)""" + print(f" 通过部门遍历查找 {name} ...", file=sys.stderr) + + # 递归获取所有部门 ID + dept_ids = ["0"] # 0 = 根部门 + queue = ["0"] + while queue: + parent_id = queue.pop(0) + data = api_get( + f"/contact/v3/departments/{parent_id}/children", + {"page_size": 50, "fetch_child": False}, + config, + ) + if data.get("code") != 0: + if parent_id == "0": + print(f" 部门遍历失败(code={data.get('code')}):{data.get('msg')}", file=sys.stderr) + print(f" 请确认已开通 contact:department.base:readonly 权限", file=sys.stderr) + return None + continue + + children = data.get("data", {}).get("items", []) + for child in children: + child_id = child.get("department_id", "") + if child_id: + dept_ids.append(child_id) + queue.append(child_id) + + print(f" 共 {len(dept_ids)} 个部门,搜索用户 ...", file=sys.stderr) + + # 在每个部门中查找用户 + matches = [] + for dept_id in dept_ids: + page_token = None + while True: + params = {"department_id": dept_id, "page_size": 50} + if page_token: + params["page_token"] = page_token + + data = api_get("/contact/v3/users/find_by_department", params, config) + if data.get("code") != 0: + break + + users = data.get("data", {}).get("items", []) + for u in users: + uname = u.get("name", "") + en_name = u.get("en_name", "") + if name in uname or name in en_name or uname == name or en_name == name: + matches.append(u) + + if not data.get("data", {}).get("has_more"): + break + page_token = data.get("data", {}).get("page_token") + + if len(matches) >= 10: + break # 够了 + + return _select_user(matches, name) + + +def _select_user(users: list, name: str) -> Optional[dict]: + """从候选列表中选择用户""" if not users: print(f" 未找到用户:{name}", file=sys.stderr) return None + # 去重(按 user_id) + seen = set() + deduped = [] + for u in users: + uid = u.get("user_id", u.get("open_id", id(u))) + if uid not in seen: + seen.add(uid) + deduped.append(u) + users = deduped + if len(users) == 1: u = users[0] - print(f" 找到用户:{u.get('name')}({u.get('department_path', [''])[0]})", file=sys.stderr) + dept_ids = u.get("department_ids", []) + print(f" 找到用户:{u.get('name')}(部门:{dept_ids[0] if dept_ids else ''})", file=sys.stderr) return u # 多个结果,让用户选择 print(f"\n 找到 {len(users)} 个结果,请选择:") for i, u in enumerate(users): - dept = u.get("department_path", [""]) - dept_str = dept[0] if dept else "" - print(f" [{i+1}] {u.get('name')} {dept_str} {u.get('user_id', '')}") + dept_ids = u.get("department_ids", []) + dept_str = dept_ids[0] if dept_ids else "" + en = u.get("en_name", "") + label = f"{u.get('name', '')} ({en})" if en else u.get("name", "") + print(f" [{i+1}] {label} dept={dept_str} uid={u.get('user_id', '')}") choice = input("\n 选择编号(默认 1):").strip() or "1" try: @@ -172,6 +271,35 @@ def find_user(name: str, config: dict) -> Optional[dict]: return users[0] +def find_user(name: str, config: dict) -> Optional[dict]: + """搜索飞书用户 + + 策略: + 1. 如果输入是邮箱/手机号 → 直接用 batch_get_id(最快) + 2. 否则 → 遍历部门查找(需要 contact:department.base:readonly) + 3. 如果部门遍历也失败 → 提示用户改用邮箱/手机号 + """ + print(f" 搜索用户:{name} ...", file=sys.stderr) + + # 方法 1:邮箱/手机号直接查找 + user = _find_user_by_contact(name, config) + if user: + return user + + # 方法 2:部门遍历 + user = _find_user_by_department(name, config) + if user: + return user + + # 都失败 + print(f"\n ❌ 未能找到用户 {name}", file=sys.stderr) + print(f" 建议:", file=sys.stderr) + print(f" 1. 确认已开通 contact:department.base:readonly 权限", file=sys.stderr) + print(f" 2. 改用邮箱搜索:--name user@company.com", file=sys.stderr) + print(f" 3. 改用手机号搜索:--name +8613800138000", file=sys.stderr) + return None + + # ─── 消息记录 ───────────────────────────────────────────────────────────────── def get_chats_with_user(user_open_id: str, config: dict) -> list: