import os import json import glob from typing import Dict, List, Any class ContentParser: def __init__( self, target_keys: str, ): """ 初始化解析器 :param target_keys: 需要提取的目标键列表 :param api_key: 大模型API密钥 :param model: 使用的大模型名称 """ self.target_keys = target_keys self.log = open("name.log", "w", encoding="utf-8") def process_directory(self, directory: str, recursive: bool = True): """ 处理目录中的所有文件 :param directory: 要处理的目录路径 :param recursive: 是否递归处理子目录 """ results = [] # 遍历目录 pattern = "**/*" if recursive else "*" for filepath in glob.glob( os.path.join(directory, pattern), recursive=recursive ): if os.path.isfile(filepath): _, ext = os.path.splitext(filepath) if ext == ".json": try: result = self._process_json(filepath) if result: results.append({"file": filepath, "description": result}) if self._check_related(result): self.log.write(f"{filepath}\n{result}\n\n") # self._analyze_with_ai(result) except Exception as e: print(f"处理文件 {filepath} 时出错: {str(e)}") return def _process_json(self, filepath: str) -> str: """处理JSON文件""" with open(filepath, "r", encoding="utf-8") as f: data = json.load(f) return self._extract_values(data) def _extract_values(self, data: Dict) -> str: """从数据中提取目标值,支持嵌套查询""" parts = self.target_keys.split(".") value = data try: for part in parts: if isinstance(value, list): value = value[int(part)] else: value = value.get(part, None) if value is None: break return value.__str__() except (KeyError, IndexError, TypeError): return "" return "" def _check_related(self, text: str) -> bool: keywords = ["docker", "namespace", "cgroup"] text = text.lower() return any(keyword in text for keyword in keywords) if __name__ == "__main__": # 配置参数 CONFIG = { "target_keys": "containers.cna.descriptions.0.value", "directory": "./data", "recursive": True, } # 创建解析器实例 parser = ContentParser(target_keys=CONFIG["target_keys"]) # 开始处理 parser.process_directory( directory=CONFIG["directory"], recursive=CONFIG["recursive"] )