130 lines
4.8 KiB
Python
130 lines
4.8 KiB
Python
import os
|
||
import json
|
||
import glob
|
||
from openai import OpenAI
|
||
from typing import Dict, List, Any
|
||
|
||
|
||
class ContentParser:
|
||
def __init__(
|
||
self,
|
||
target_keys: str,
|
||
api_key: str,
|
||
base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1",
|
||
model: str = "gpt-3.5-turbo",
|
||
):
|
||
"""
|
||
初始化解析器
|
||
:param target_keys: 需要提取的目标键列表
|
||
:param api_key: 大模型API密钥
|
||
:param model: 使用的大模型名称
|
||
"""
|
||
self.client = OpenAI(
|
||
api_key=api_key,
|
||
base_url=base_url,
|
||
)
|
||
self.target_keys = target_keys
|
||
self.log = open("name.log", "w", encoding="utf-8")
|
||
|
||
def process_directory(self, directory: str, recursive: bool = True):
|
||
"""
|
||
处理目录中的所有文件
|
||
:param directory: 要处理的目录路径
|
||
:param recursive: 是否递归处理子目录
|
||
"""
|
||
results = []
|
||
|
||
# 遍历目录
|
||
pattern = "**/*" if recursive else "*"
|
||
for filepath in glob.glob(
|
||
os.path.join(directory, pattern), recursive=recursive
|
||
):
|
||
if os.path.isfile(filepath):
|
||
_, ext = os.path.splitext(filepath)
|
||
if ext == ".json":
|
||
try:
|
||
result = self._process_json(filepath)
|
||
if result:
|
||
results.append({"file": filepath, "description": result})
|
||
if self._check_related(result):
|
||
self.log.write(f"{filepath}\n")
|
||
# self._analyze_with_ai(result)
|
||
except Exception as e:
|
||
print(f"处理文件 {filepath} 时出错: {str(e)}")
|
||
return
|
||
|
||
def _process_json(self, filepath: str) -> str:
|
||
"""处理JSON文件"""
|
||
with open(filepath, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
return self._extract_values(data)
|
||
|
||
def _extract_values(self, data: Dict) -> str:
|
||
"""从数据中提取目标值,支持嵌套查询"""
|
||
parts = self.target_keys.split(".")
|
||
value = data
|
||
try:
|
||
for part in parts:
|
||
if isinstance(value, list):
|
||
value = value[int(part)]
|
||
else:
|
||
value = value.get(part, None)
|
||
if value is None:
|
||
break
|
||
return value.__str__()
|
||
except (KeyError, IndexError, TypeError):
|
||
return ""
|
||
return ""
|
||
|
||
def _check_related(self, text: str) -> bool:
|
||
keywords = ["docker", "namespace", "cgroup"]
|
||
text = text.lower()
|
||
return any(keyword in text for keyword in keywords)
|
||
|
||
def _analyze_with_ai(self, data: str):
|
||
prompt = """
|
||
请分析下面用户从文件中提取的数据,给出总结报告:
|
||
分析要求:
|
||
1. 分析这个CVE信息是否与Linux、Kernel相关
|
||
2. 如果满足与Linux、Kernel相关,分析是否与namespace、cgroup、container或者容器、隔离相关
|
||
3. 如果不满足1、2两个条件,直接输出“非相关CVE”,不需要附带任何其他内容
|
||
4. 如果满足1、2两个条件,直接输出“可疑CVE”,不需要附带任何其他内容
|
||
"""
|
||
|
||
try:
|
||
completion = self.client.chat.completions.create(
|
||
# 模型列表:https://help.aliyun.com/zh/model-studio/getting-started/models
|
||
model="qwen-plus",
|
||
messages=[
|
||
{"role": "system", "content": prompt},
|
||
{
|
||
"role": "user",
|
||
"content": data,
|
||
},
|
||
],
|
||
# Qwen3模型通过enable_thinking参数控制思考过程(开源版默认True,商业版默认False)
|
||
# 使用Qwen3开源版模型时,若未启用流式输出,请将下行取消注释,否则会报错
|
||
# extra_body={"enable_thinking": False},
|
||
)
|
||
print(completion.model_dump_json())
|
||
except Exception as e:
|
||
print(f"调用大模型失败: {str(e)}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# 配置参数
|
||
CONFIG = {
|
||
"target_keys": "containers.cna.descriptions.0.value",
|
||
"api_key": "sk-5ec7751941974a9cb5855f746fe45a62",
|
||
"directory": "./data",
|
||
"recursive": True,
|
||
}
|
||
|
||
# 创建解析器实例
|
||
parser = ContentParser(target_keys=CONFIG["target_keys"], api_key=CONFIG["api_key"])
|
||
|
||
# 开始处理
|
||
parser.process_directory(
|
||
directory=CONFIG["directory"], recursive=CONFIG["recursive"]
|
||
)
|