Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions astrbot/cli/commands/cmd_conf.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import hashlib
import json
import zoneinfo
from collections.abc import Callable
from typing import Any

import click

from astrbot.core.utils.auth_password import (
hash_dashboard_password,
validate_dashboard_password,
)

from ..utils import check_astrbot_root, get_astrbot_root


Expand Down Expand Up @@ -39,9 +43,11 @@ def _validate_dashboard_username(value: str) -> str:

def _validate_dashboard_password(value: str) -> str:
"""Validate Dashboard password"""
if not value:
raise click.ClickException("Password cannot be empty")
return hashlib.md5(value.encode()).hexdigest()
try:
validate_dashboard_password(value)
except ValueError as e:
raise click.ClickException(str(e))
return hash_dashboard_password(value)


def _validate_timezone(value: str) -> str:
Expand Down
10 changes: 10 additions & 0 deletions astrbot/core/config/astrbot_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import os

from astrbot.core.utils.astrbot_path import get_astrbot_data_path
from astrbot.core.utils.auth_password import (
normalize_dashboard_password_hash,
)

from .default import DEFAULT_CONFIG, DEFAULT_VALUE_MAP

Expand Down Expand Up @@ -59,6 +62,13 @@ def __init__(

# 检查配置完整性,并插入
has_new = self.check_config_integrity(default_config, conf)
if (
"dashboard" in conf
and isinstance(conf["dashboard"], dict)
and not conf["dashboard"].get("password")
):
conf["dashboard"]["password"] = normalize_dashboard_password_hash("")
has_new = True
self.update(conf)
if has_new:
self.save_config()
Expand Down
2 changes: 1 addition & 1 deletion astrbot/core/config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@
"dashboard": {
"enable": True,
"username": "astrbot",
"password": "77b90590a8945a7d36c963981a307dc9",
"password": "",
"jwt_secret": "",
"host": "0.0.0.0",
"port": 6185,
Expand Down
113 changes: 113 additions & 0 deletions astrbot/core/utils/auth_password.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""Utilities for dashboard password hashing and verification."""

import hashlib
import hmac
import re
import secrets

_PBKDF2_ITERATIONS = 200_000
_PBKDF2_SALT_BYTES = 16
_PBKDF2_ALGORITHM = "pbkdf2_sha256"
_PBKDF2_FORMAT = f"{_PBKDF2_ALGORITHM}$"
_LEGACY_MD5_LENGTH = 32
_DASHBOARD_PASSWORD_MIN_LENGTH = 12
DEFAULT_DASHBOARD_PASSWORD = "astrbot"


def hash_dashboard_password(raw_password: str) -> str:
"""Return a salted hash for dashboard password using PBKDF2-HMAC-SHA256."""
if not isinstance(raw_password, str) or raw_password == "":
raise ValueError("Password cannot be empty")

salt = secrets.token_hex(_PBKDF2_SALT_BYTES)
digest = hashlib.pbkdf2_hmac(
"sha256",
raw_password.encode("utf-8"),
bytes.fromhex(salt),
_PBKDF2_ITERATIONS,
).hex()
return f"{_PBKDF2_FORMAT}{_PBKDF2_ITERATIONS}${salt}${digest}"


def validate_dashboard_password(raw_password: str) -> None:
"""Validate whether dashboard password meets the minimal complexity policy."""
if not isinstance(raw_password, str) or raw_password == "":
raise ValueError("Password cannot be empty")
if len(raw_password) < _DASHBOARD_PASSWORD_MIN_LENGTH:
raise ValueError(
f"Password must be at least {_DASHBOARD_PASSWORD_MIN_LENGTH} characters long"
)

if not re.search(r"[A-Z]", raw_password):
raise ValueError("Password must include at least one uppercase letter")
if not re.search(r"[a-z]", raw_password):
raise ValueError("Password must include at least one lowercase letter")
if not re.search(r"\d", raw_password):
raise ValueError("Password must include at least one digit")


def normalize_dashboard_password_hash(stored_password: str) -> str:
"""Ensure dashboard password has a value, fallback to default dashboard password hash."""
if not stored_password:
return hash_dashboard_password(DEFAULT_DASHBOARD_PASSWORD)
return stored_password


def _is_legacy_md5_hash(stored: str) -> bool:
return (
isinstance(stored, str)
and len(stored) == _LEGACY_MD5_LENGTH
and all(c in "0123456789abcdefABCDEF" for c in stored)
)


def _is_pbkdf2_hash(stored: str) -> bool:
return isinstance(stored, str) and stored.startswith(_PBKDF2_FORMAT)


def verify_dashboard_password(stored_hash: str, candidate_password: str) -> bool:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): Consider refactoring verify_dashboard_password into a small dispatcher plus per-scheme helpers so each hash type’s detection, parsing, and verification are clearly separated.

verify_dashboard_password is carrying multiple responsibilities (scheme detection, parsing, and verification) and mixing semantics in a single branch. You can keep all behavior but make it easier to read and test by introducing a small dispatcher + per-scheme helpers.

For example:

from enum import Enum, auto

class _HashScheme(Enum):
    LEGACY_MD5 = auto()
    PBKDF2 = auto()
    UNKNOWN = auto()


def _get_hash_scheme(stored_hash: str) -> _HashScheme:
    if _is_legacy_md5_hash(stored_hash):
        return _HashScheme.LEGACY_MD5
    if _is_pbkdf2_hash(stored_hash):
        return _HashScheme.PBKDF2
    return _HashScheme.UNKNOWN

Then split the verification logic into named functions so the compatibility behavior is explicit:

def _verify_legacy_md5_hash(stored_hash: str, candidate_password: str) -> bool:
    # Keep compatibility with existing md5-based deployments:
    # new clients send plain password, old clients may send md5 of it.
    candidate_md5 = hashlib.md5(candidate_password.encode("utf-8")).hexdigest()
    stored = stored_hash.lower()
    return (
        hmac.compare_digest(stored, candidate_md5.lower())
        or hmac.compare_digest(stored, candidate_password.lower())
    )


def _verify_pbkdf2_hash(stored_hash: str, candidate_password: str) -> bool:
    parts: list[str] = stored_hash.split("$")
    if len(parts) != 4:
        return False
    _, iterations_s, salt, digest = parts
    try:
        iterations = int(iterations_s)
        stored_key = bytes.fromhex(digest)
        salt_bytes = bytes.fromhex(salt)
    except (TypeError, ValueError):
        return False

    candidate_key = hashlib.pbkdf2_hmac(
        "sha256",
        candidate_password.encode("utf-8"),
        salt_bytes,
        iterations,
    )
    return hmac.compare_digest(stored_key, candidate_key)

verify_dashboard_password then becomes a straightforward dispatcher:

def verify_dashboard_password(stored_hash: str, candidate_password: str) -> bool:
    if not isinstance(stored_hash, str) or not isinstance(candidate_password, str):
        return False

    scheme = _get_hash_scheme(stored_hash)
    if scheme is _HashScheme.LEGACY_MD5:
        return _verify_legacy_md5_hash(stored_hash, candidate_password)
    if scheme is _HashScheme.PBKDF2:
        return _verify_pbkdf2_hash(stored_hash, candidate_password)
    return False

This keeps all existing behavior (including the legacy MD5 dual comparison and default-password checks) but makes each piece self-contained and easier to reason about and test individually.

"""Verify password against legacy md5 or new PBKDF2-SHA256 format."""
if not isinstance(stored_hash, str) or not isinstance(candidate_password, str):
return False

if _is_legacy_md5_hash(stored_hash):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to enforce user to update the password by a specially constructed function, rather than being inserted into the validation stream.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently after user updates astrbot and opens the WebUI, a mandatory password change dialog will appear.

# Keep compatibility with existing md5-based deployments:
# new clients send plain password, old clients may send md5 of it.
candidate_md5 = hashlib.md5(candidate_password.encode("utf-8")).hexdigest()

Check failure

Code scanning / CodeQL

Use of a broken or weak cryptographic hashing algorithm on sensitive data High

Sensitive data (password)
is used in a hashing algorithm (MD5) that is insecure for password hashing, since it is not a computationally expensive hash function.
Sensitive data (password)
is used in a hashing algorithm (MD5) that is insecure for password hashing, since it is not a computationally expensive hash function.
Sensitive data (password) is used in a hashing algorithm (MD5) that is insecure for password hashing, since it is not a computationally expensive hash function.
Sensitive data (password) is used in a hashing algorithm (MD5) that is insecure for password hashing, since it is not a computationally expensive hash function.
Sensitive data (password) is used in a hashing algorithm (MD5) that is insecure for password hashing, since it is not a computationally expensive hash function.
return hmac.compare_digest(
stored_hash.lower(), candidate_md5.lower()
) or hmac.compare_digest(
stored_hash.lower(),
candidate_password.lower(),
)

if _is_pbkdf2_hash(stored_hash):
parts: list[str] = stored_hash.split("$")
if len(parts) != 4:
return False
_, iterations_s, salt, digest = parts
try:
iterations = int(iterations_s)
stored_key = bytes.fromhex(digest)
salt_bytes = bytes.fromhex(salt)
except (TypeError, ValueError):
return False
candidate_key = hashlib.pbkdf2_hmac(
"sha256",
candidate_password.encode("utf-8"),
salt_bytes,
iterations,
)
return hmac.compare_digest(stored_key, candidate_key)

return False


def is_default_dashboard_password(stored_hash: str) -> bool:
"""Check whether the password still equals the built-in default value."""
return verify_dashboard_password(stored_hash, DEFAULT_DASHBOARD_PASSWORD)


def is_legacy_dashboard_password(stored_hash: str) -> bool:
"""Check whether the password is still stored with legacy MD5."""
return _is_legacy_md5_hash(stored_hash)
46 changes: 40 additions & 6 deletions astrbot/dashboard/routes/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@

from astrbot import logger
from astrbot.core import DEMO_MODE
from astrbot.core.utils.auth_password import (
hash_dashboard_password,
is_default_dashboard_password,
is_legacy_dashboard_password,
validate_dashboard_password,
verify_dashboard_password,
)

from .route import Response, Route, RouteContext

Expand All @@ -23,15 +30,29 @@ async def login(self):
username = self.config["dashboard"]["username"]
password = self.config["dashboard"]["password"]
post_data = await request.json
if post_data["username"] == username and post_data["password"] == password:

req_username = (
post_data.get("username") if isinstance(post_data, dict) else None
)
req_password = (
post_data.get("password") if isinstance(post_data, dict) else None
)
if not isinstance(req_username, str) or not isinstance(req_password, str):
return Response().error("Invalid request payload").__dict__

if req_username == username and verify_dashboard_password(
password, req_password
):
change_pwd_hint = False
legacy_pwd_hint = is_legacy_dashboard_password(password)
if (
username == "astrbot"
and password == "77b90590a8945a7d36c963981a307dc9"
and is_default_dashboard_password(password)
and not DEMO_MODE
):
change_pwd_hint = True
logger.warning("为了保证安全,请尽快修改默认密码。")
logger.warning("检测到默认管理员凭据,请尽快修改密码。")
legacy_pwd_hint = True

return (
Response()
Expand All @@ -40,6 +61,7 @@ async def login(self):
"token": self.generate_jwt(username),
"username": username,
"change_pwd_hint": change_pwd_hint,
"legacy_pwd_hint": legacy_pwd_hint,
},
)
.__dict__
Expand All @@ -57,8 +79,14 @@ async def edit_account(self):

password = self.config["dashboard"]["password"]
post_data = await request.json
if not isinstance(post_data, dict):
return Response().error("Invalid request payload").__dict__

req_password = post_data.get("password")
if not isinstance(req_password, str):
return Response().error("Invalid request payload").__dict__

if post_data["password"] != password:
if not verify_dashboard_password(password, req_password):
return Response().error("原密码错误").__dict__

new_pwd = post_data.get("new_password", None)
Expand All @@ -68,10 +96,16 @@ async def edit_account(self):

# Verify password confirmation
if new_pwd:
if not isinstance(new_pwd, str):
return Response().error("新密码无效").__dict__
confirm_pwd = post_data.get("confirm_password", None)
if confirm_pwd != new_pwd:
if not isinstance(confirm_pwd, str) or confirm_pwd != new_pwd:
return Response().error("两次输入的新密码不一致").__dict__
self.config["dashboard"]["password"] = new_pwd
try:
validate_dashboard_password(new_pwd)
except ValueError as e:
return Response().error(str(e)).__dict__
self.config["dashboard"]["password"] = hash_dashboard_password(new_pwd)
if new_username:
self.config["dashboard"]["username"] = new_username

Expand Down
13 changes: 10 additions & 3 deletions astrbot/dashboard/routes/stat.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import aiohttp
import psutil
from quart import request
from sqlmodel import select
from sqlmodel import col, select

from astrbot.core import DEMO_MODE, logger
from astrbot.core.config import VERSION
Expand All @@ -21,6 +21,10 @@
from astrbot.core.db.migration.helper import check_migration_needed_v4
from astrbot.core.db.po import ProviderStat
from astrbot.core.utils.astrbot_path import get_astrbot_path
from astrbot.core.utils.auth_password import (
is_default_dashboard_password,
is_legacy_dashboard_password,
)
from astrbot.core.utils.io import get_dashboard_version
from astrbot.core.utils.storage_cleaner import StorageCleaner
from astrbot.core.utils.version_comparator import VersionComparator
Expand Down Expand Up @@ -76,7 +80,7 @@ def is_default_cred(self):
password = self.config["dashboard"]["password"]
return (
username == "astrbot"
and password == "77b90590a8945a7d36c963981a307dc9"
and is_default_dashboard_password(password)
and not DEMO_MODE
)

Expand All @@ -90,6 +94,9 @@ async def get_version(self):
"version": VERSION,
"dashboard_version": await get_dashboard_version(),
"change_pwd_hint": self.is_default_cred(),
"legacy_pwd_hint": is_legacy_dashboard_password(
self.config["dashboard"]["password"]
),
"need_migration": need_migration,
},
)
Expand Down Expand Up @@ -226,7 +233,7 @@ async def get_provider_token_stats(self):
ProviderStat.agent_type == "internal",
ProviderStat.created_at >= query_start_utc,
)
.order_by(ProviderStat.created_at.asc())
.order_by(col(ProviderStat.created_at).asc())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Using col(ProviderStat.created_at) instead of ProviderStat.created_at directly is likely incorrect and could break the query.

This was previously .order_by(ProviderStat.created_at.asc()), which is the standard SQLAlchemy pattern. col() is generally used with string column names, and passing an ORM attribute may be unnecessary or even alter the generated SQL depending on the SQLAlchemy version. Unless there’s a specific need for col() here, please revert to ProviderStat.created_at.asc().

)
records = result.scalars().all()

Expand Down
9 changes: 5 additions & 4 deletions dashboard/src/i18n/locales/en-US/core/header.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,21 @@
},
"accountDialog": {
"title": "Modify Account",
"securityWarning": "Security Reminder: Please change the default password to ensure account security",
"securityWarning": "Security Reminder: Please change your password to secure your account",
"securityWarningLegacy": "The new AstrBot version has improved security. Please change your password.",
"form": {
"currentPassword": "Current Password",
"newPassword": "New Password",
"confirmPassword": "Confirm New Password",
"newUsername": "New Username (Optional)",
"passwordHint": "Password must be at least 8 characters",
"passwordHint": "Password must be at least 12 characters",
"confirmPasswordHint": "Please enter new password again to confirm",
"usernameHint": "Leave blank to keep current username",
"defaultCredentials": "Default username and password are both astrbot"
"defaultCredentials": "The new AstrBot version has improved security. Please change your password."
},
"validation": {
"passwordRequired": "Please enter password",
"passwordMinLength": "Password must be at least 8 characters",
"passwordMinLength": "Password must be at least 12 characters",
"passwordMatch": "Passwords do not match",
"usernameMinLength": "Username must be at least 3 characters"
},
Expand Down
4 changes: 2 additions & 2 deletions dashboard/src/i18n/locales/en-US/features/auth.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"login": "Login",
"username": "Username",
"password": "Password",
"defaultHint": "Default username and password: astrbot",
"defaultHint": "The new AstrBot version has improved security. Please change your password.",
"logo": {
"title": "AstrBot Dashboard",
"subtitle": "Welcome"
Expand All @@ -11,4 +11,4 @@
"switchToDark": "Switch to Dark Theme",
"switchToLight": "Switch to Light Theme"
}
}
}
4 changes: 2 additions & 2 deletions dashboard/src/i18n/locales/en-US/messages/validation.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
"fileType": "Unsupported file type",
"required_field": "Please fill in the required field",
"invalid_format": "Invalid format",
"password_too_short": "Password must be at least 8 characters",
"password_too_short": "Password must be at least 12 characters",
"password_too_weak": "Password is too weak",
"invalid_phone": "Please enter a valid phone number",
"invalid_date": "Please enter a valid date",
"date_range": "Invalid date range",
"upload_failed": "File upload failed",
"network_error": "Network connection error, please try again",
"operation_cannot_be_undone": "⚠️ This operation cannot be undone, please choose carefully!"
}
}
Loading
Loading