feat(WIP): 添加项目架构和callback接口

This commit is contained in:
sleepwithoutbz
2025-09-26 01:11:55 +08:00
committed by Ubuntu
parent d02f901519
commit 11cd17cbf4
8 changed files with 96 additions and 16 deletions

6
.gitignore vendored
View File

@@ -1,2 +1,4 @@
__pycache__
.kds_venv
__pycache__/
.kds_venv/
.env

9
deploy.sh Executable file
View File

@@ -0,0 +1,9 @@
#! /usr/bin/bash
echo "Deploying..."
git push
# 连接ssh服务器tencent进入特定目录执行指令
ssh tencent "cd /home/ubuntu/kds_project && git pull && sudo systemctl restart kds"
echo "Deploy finished!"

0
kds/__init__.py Normal file
View File

8
kds/main.py Normal file
View File

@@ -0,0 +1,8 @@
from typing import Union
from fastapi import FastAPI
from .routers import callback, root
app = FastAPI()
app.include_router(callback.router)
app.include_router(root.router)

0
kds/routers/__init__.py Normal file
View File

67
kds/routers/callback.py Normal file
View File

@@ -0,0 +1,67 @@
from fastapi import FastAPI, Request, HTTPException, Query, APIRouter
from fastapi.responses import JSONResponse, PlainTextResponse
import hashlib
import xml.etree.ElementTree as ET
router = APIRouter()
APP_ID = "wx33726a2d409faa94"
APP_SECRET = "59b7919d45d4631ab55bf418abb3e8bf"
# TODO: 接收消息推送使用安全模式
TOKEN = "sleepwithoutbz"
ENCODING_AES_KEY = "njMce1oYKW7V11yug9Qjm689DYzXQceFedaeRDHSOsL"
@router.get("/callback", response_class=PlainTextResponse)
async def wechat_callback(
signature: str = Query(..., description="微信加密签名"),
timestamp: str = Query(..., description="时间戳"),
nonce: str = Query(..., description="随机数"),
echostr: str = Query(..., description="验证字符串"),
):
"""
微信验证回调接口
1. 将token/timestamp/nonce按字典序排序
2. 拼接后sha1加密
3. 与signature比对验证
"""
# 1. 参数排序
params = sorted([TOKEN, timestamp, nonce])
# 2. 拼接字符串并加密
sign_str = "".join(params).encode("utf-8")
calculated_signature = hashlib.sha1(sign_str).hexdigest()
# 3. 验证签名
if calculated_signature == signature:
# 验证成功返回原始echostr
return echostr
else:
# 验证失败返回错误
raise HTTPException(status_code=403, detail="Invalid signature")
@router.post("/callback")
async def wechat_callback_post(request: Request):
"""
微信消息推送接口
"""
# 1. 获取请求数据
xml_data = await request.body()
# 2. 解析xml数据
root = ET.fromstring(xml_data)
# 3. 获取消息类型
msg_type = root.find("MsgType").text
# 4. 处理消息
if msg_type == "text":
# 4.1. 处理文本消息
content = root.find("Content").text
return JSONResponse(content={"Content": content})
else:
# 4.2. 处理其他类型消息
return JSONResponse(content={"Content": ""})

8
kds/routers/root.py Normal file
View File

@@ -0,0 +1,8 @@
from fastapi import APIRouter
router = APIRouter()
@router.get("/")
async def home():
return {"Hello": "World"}

14
main.py
View File

@@ -1,14 +0,0 @@
from typing import Union
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
async def read_root():
return {"Hello": "World"}
@app.get("/items/{item_id}")
async def read_item(item_id: int, q: Union[str, None] = None):
return {"item_id": item_id, "q": q}