85 lines
3.6 KiB
Python
85 lines
3.6 KiB
Python
# 分析每个CVE源自哪个应用程序、什么原因、什么后果等信息
|
||
|
||
from os import truncate
|
||
import re
|
||
import json
|
||
from openai import OpenAI
|
||
from typing import Dict, List, Any
|
||
|
||
|
||
def process_result(data: Any) -> str:
|
||
# target_key = "choices.0.message.content"
|
||
parts = ["choices", "0", "message", "content"]
|
||
try:
|
||
for part in parts:
|
||
if isinstance(data, list):
|
||
data = data[int(part)]
|
||
else:
|
||
data = data.get(part, None)
|
||
if data is None:
|
||
break
|
||
return data
|
||
except (KeyError, IndexError, TypeError):
|
||
return "N/A"
|
||
return "N/A"
|
||
|
||
|
||
def analyze_with_ai(data: str) -> str:
|
||
prompt = """
|
||
你是一个计算机容器安全领域的专家,能够根据CVE描述分析出CVE产生于哪个程序、有何影响后果等信息。
|
||
下面我会发给你一段文本,第一行是CVE的编号与文件位置信息,后续是这个CVE的详细描述。
|
||
请分析我发送给你的CVE详细描述内容,并进行分析,我有如下要求:
|
||
1. 分析这个CVE信息是否和这个CVE描述示例是相同的成因,即信息在构建时被写入镜像,所有用户共用同一份信息;
|
||
2. 如果不满足条件1,你只需返回"N/A"即可,无需任何多余信息;
|
||
3. 如果这个CVE满足条件1,那么你只需要直接返回一个json,这个json样例如下:
|
||
{"Conatiner":"[存在CVE的容器镜像]","CVE_Reason":"[容器泄漏或未妥善保护的文件、密钥等敏感信息类型]","CVE_Consequence":"[该CVE的后果,以及严重程度]"}
|
||
请牢记,你必须只返回json内容,并且尽量使用中文。
|
||
"""
|
||
|
||
client = OpenAI(
|
||
api_key="sk-5ec7751941974a9cb5855f746fe45a62",
|
||
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
|
||
)
|
||
try:
|
||
completion = client.chat.completions.create(
|
||
# 模型列表:https://help.aliyun.com/zh/model-studio/getting-started/models
|
||
model="qwen-plus",
|
||
messages=[
|
||
{"role": "system", "content": prompt},
|
||
{"role": "user", "content": data},
|
||
],
|
||
# Qwen3模型通过enable_thinking参数控制思考过程(开源版默认True,商业版默认False)
|
||
# 使用Qwen3开源版模型时,若未启用流式输出,请将下行取消注释,否则会报错
|
||
# extra_body={"enable_thinking": False},
|
||
)
|
||
return process_result(json.loads(completion.model_dump_json()))
|
||
except Exception as e:
|
||
print(f"调用大模型失败: {str(e)}")
|
||
return "N/A"
|
||
return "N/A"
|
||
|
||
|
||
if __name__ == "__main__":
|
||
pattern = r"\./data/.*x/CVE.*?\.json"
|
||
regex = re.compile(pattern)
|
||
|
||
with open("keyword_filter_11-20.log", "r", encoding="utf-8") as file:
|
||
text = file.read()
|
||
matches = list(regex.finditer(text))
|
||
|
||
log = open("./info_reveal_result/analysis_result_11-20.log", "w", encoding="utf-8")
|
||
|
||
# 如果没有找到任何匹配项
|
||
if not matches:
|
||
print("No match found.")
|
||
else:
|
||
for i in range(len(matches)):
|
||
start = matches[i].start()
|
||
# 如果不是最后一个匹配项,则end是下一个匹配项的起始位置;否则为文本末尾
|
||
end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
|
||
block = text[start:end].strip()
|
||
ans = analyze_with_ai(block)
|
||
if ans == "N/A":
|
||
continue
|
||
log.write(f"cve: {block}\n\nanalysis: {ans}\n\n")
|