92 lines
3.0 KiB
Python
92 lines
3.0 KiB
Python
import os
|
|
import json
|
|
import glob
|
|
from typing import Dict, List, Any
|
|
|
|
|
|
class ContentParser:
|
|
def __init__(
|
|
self,
|
|
target_keys: str,
|
|
):
|
|
"""
|
|
初始化解析器
|
|
:param target_keys: 需要提取的目标键列表
|
|
:param api_key: 大模型API密钥
|
|
:param model: 使用的大模型名称
|
|
"""
|
|
self.target_keys = target_keys
|
|
self.log = open("name.log", "w", encoding="utf-8")
|
|
|
|
def process_directory(self, directory: str, recursive: bool = True):
|
|
"""
|
|
处理目录中的所有文件
|
|
:param directory: 要处理的目录路径
|
|
:param recursive: 是否递归处理子目录
|
|
"""
|
|
results = []
|
|
|
|
# 遍历目录
|
|
pattern = "**/*" if recursive else "*"
|
|
for filepath in glob.glob(
|
|
os.path.join(directory, pattern), recursive=recursive
|
|
):
|
|
if os.path.isfile(filepath):
|
|
_, ext = os.path.splitext(filepath)
|
|
if ext == ".json":
|
|
try:
|
|
result = self._process_json(filepath)
|
|
if result:
|
|
results.append({"file": filepath, "description": result})
|
|
if self._check_related(result):
|
|
self.log.write(f"{filepath}\n{result}\n\n")
|
|
# self._analyze_with_ai(result)
|
|
except Exception as e:
|
|
print(f"处理文件 {filepath} 时出错: {str(e)}")
|
|
return
|
|
|
|
def _process_json(self, filepath: str) -> str:
|
|
"""处理JSON文件"""
|
|
with open(filepath, "r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
return self._extract_values(data)
|
|
|
|
def _extract_values(self, data: Dict) -> str:
|
|
"""从数据中提取目标值,支持嵌套查询"""
|
|
parts = self.target_keys.split(".")
|
|
value = data
|
|
try:
|
|
for part in parts:
|
|
if isinstance(value, list):
|
|
value = value[int(part)]
|
|
else:
|
|
value = value.get(part, None)
|
|
if value is None:
|
|
break
|
|
return value.__str__()
|
|
except (KeyError, IndexError, TypeError):
|
|
return ""
|
|
return ""
|
|
|
|
def _check_related(self, text: str) -> bool:
|
|
keywords = ["docker", "namespace", "cgroup"]
|
|
text = text.lower()
|
|
return any(keyword in text for keyword in keywords)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# 配置参数
|
|
CONFIG = {
|
|
"target_keys": "containers.cna.descriptions.0.value",
|
|
"directory": "./data",
|
|
"recursive": True,
|
|
}
|
|
|
|
# 创建解析器实例
|
|
parser = ContentParser(target_keys=CONFIG["target_keys"])
|
|
|
|
# 开始处理
|
|
parser.process_directory(
|
|
directory=CONFIG["directory"], recursive=CONFIG["recursive"]
|
|
)
|