Files
cve/ai_analysis.py
sleepwithoutbz 79ae3fec86 Init commit.
2025-05-27 15:15:24 +08:00

130 lines
4.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import glob
from openai import OpenAI
from typing import Dict, List, Any
class ContentParser:
def __init__(
self,
target_keys: str,
api_key: str,
base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1",
model: str = "gpt-3.5-turbo",
):
"""
初始化解析器
:param target_keys: 需要提取的目标键列表
:param api_key: 大模型API密钥
:param model: 使用的大模型名称
"""
self.client = OpenAI(
api_key=api_key,
base_url=base_url,
)
self.target_keys = target_keys
self.log = open("name.log", "w", encoding="utf-8")
def process_directory(self, directory: str, recursive: bool = True):
"""
处理目录中的所有文件
:param directory: 要处理的目录路径
:param recursive: 是否递归处理子目录
"""
results = []
# 遍历目录
pattern = "**/*" if recursive else "*"
for filepath in glob.glob(
os.path.join(directory, pattern), recursive=recursive
):
if os.path.isfile(filepath):
_, ext = os.path.splitext(filepath)
if ext == ".json":
try:
result = self._process_json(filepath)
if result:
results.append({"file": filepath, "description": result})
if self._check_related(result):
self.log.write(f"{filepath}\n")
# self._analyze_with_ai(result)
except Exception as e:
print(f"处理文件 {filepath} 时出错: {str(e)}")
return
def _process_json(self, filepath: str) -> str:
"""处理JSON文件"""
with open(filepath, "r", encoding="utf-8") as f:
data = json.load(f)
return self._extract_values(data)
def _extract_values(self, data: Dict) -> str:
"""从数据中提取目标值,支持嵌套查询"""
parts = self.target_keys.split(".")
value = data
try:
for part in parts:
if isinstance(value, list):
value = value[int(part)]
else:
value = value.get(part, None)
if value is None:
break
return value.__str__()
except (KeyError, IndexError, TypeError):
return ""
return ""
def _check_related(self, text: str) -> bool:
keywords = ["docker", "namespace", "cgroup"]
text = text.lower()
return any(keyword in text for keyword in keywords)
def _analyze_with_ai(self, data: str):
prompt = """
请分析下面用户从文件中提取的数据,给出总结报告:
分析要求:
1. 分析这个CVE信息是否与Linux、Kernel相关
2. 如果满足与Linux、Kernel相关分析是否与namespace、cgroup、container或者容器、隔离相关
3. 如果不满足1、2两个条件直接输出“非相关CVE”不需要附带任何其他内容
4. 如果满足1、2两个条件直接输出“可疑CVE”不需要附带任何其他内容
"""
try:
completion = self.client.chat.completions.create(
# 模型列表https://help.aliyun.com/zh/model-studio/getting-started/models
model="qwen-plus",
messages=[
{"role": "system", "content": prompt},
{
"role": "user",
"content": data,
},
],
# Qwen3模型通过enable_thinking参数控制思考过程开源版默认True商业版默认False
# 使用Qwen3开源版模型时若未启用流式输出请将下行取消注释否则会报错
# extra_body={"enable_thinking": False},
)
print(completion.model_dump_json())
except Exception as e:
print(f"调用大模型失败: {str(e)}")
if __name__ == "__main__":
# 配置参数
CONFIG = {
"target_keys": "containers.cna.descriptions.0.value",
"api_key": "sk-5ec7751941974a9cb5855f746fe45a62",
"directory": "./data",
"recursive": True,
}
# 创建解析器实例
parser = ContentParser(target_keys=CONFIG["target_keys"], api_key=CONFIG["api_key"])
# 开始处理
parser.process_directory(
directory=CONFIG["directory"], recursive=CONFIG["recursive"]
)