-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Expand file tree
/
Copy pathauth.py
More file actions
100 lines (83 loc) · 3.43 KB
/
auth.py
File metadata and controls
100 lines (83 loc) · 3.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import asyncio
import datetime
import jwt
from quart import request
from astrbot import logger
from astrbot.core import DEMO_MODE
from astrbot.core.db import BaseDatabase
from .route import Response, Route, RouteContext
class AuthRoute(Route):
def __init__(self, context: RouteContext, db: BaseDatabase) -> None:
super().__init__(context)
self.db = db
self.routes = {
"/auth/login": ("POST", self.login),
"/auth/account/edit": ("POST", self.edit_account),
}
self.register_routes()
async def login(self):
username = self.config["dashboard"]["username"]
password = self.config["dashboard"]["password"]
post_data = await request.json
if post_data["username"] == username and post_data["password"] == password:
change_pwd_hint = False
if (
username == "astrbot"
and password == "77b90590a8945a7d36c963981a307dc9"
and not DEMO_MODE
):
change_pwd_hint = True
logger.warning("为了保证安全,请尽快修改默认密码。")
return (
Response()
.ok(
{
"token": self.generate_jwt(username),
"username": username,
"change_pwd_hint": change_pwd_hint,
},
)
.__dict__
)
await asyncio.sleep(3)
return Response().error("用户名或密码错误").__dict__
async def edit_account(self):
if DEMO_MODE:
return (
Response()
.error("You are not permitted to do this operation in demo mode")
.__dict__
)
password = self.config["dashboard"]["password"]
post_data = await request.json
if post_data["password"] != password:
return Response().error("原密码错误").__dict__
new_pwd = post_data.get("new_password", None)
new_username = post_data.get("new_username", None)
if not new_pwd and not new_username:
return Response().error("新用户名和新密码不能同时为空").__dict__
# Verify password confirmation
if new_pwd:
confirm_pwd = post_data.get("confirm_password", None)
if confirm_pwd != new_pwd:
return Response().error("两次输入的新密码不一致").__dict__
self.config["dashboard"]["password"] = new_pwd
old_username = self.config["dashboard"]["username"]
if new_username:
self.config["dashboard"]["username"] = new_username
# Migrate webchat user data before saving config to keep them in sync.
if new_username and new_username != old_username:
await self.db.migrate_user_webchat_data(old_username, new_username)
self.config.save_config()
return Response().ok(None, "修改成功").__dict__
def generate_jwt(self, username):
payload = {
"username": username,
"exp": datetime.datetime.now(datetime.timezone.utc)
+ datetime.timedelta(days=7),
}
jwt_token = self.config["dashboard"].get("jwt_secret", None)
if not jwt_token:
raise ValueError("JWT secret is not set in the cmd_config.")
token = jwt.encode(payload, jwt_token, algorithm="HS256")
return token