mirror of
https://github.com/Gozargah/Marzban.git
synced 2026-05-17 00:25:53 +03:00
* chore: update pydantic to version 2.10.2 and refactor model validators * refactor: simplify field validators by removing unnecessary pre and always flags * remove typing_extensions==4.9.0 from requirements.txt * refactor: remove allow_reuse flag from status field validator * refactor: simplify field validators by removing pre and always flags * refactor: update user model imports and enhance account class with abstract method * refactor: update model_config to use dictionary format in Admin and SubscriptionUserResponse classes * fix typo in UserDataResetByNext * change pre=True to mode="before" * refactor: update validation methods and model configuration in User and Proxy classes * change pre=False with mode="after" * Migrated to Pydantic V2 * fix: custom subscriptions not workong * some small changes * add missing properties to example schema * replace from_orm with model_validate --------- Co-authored-by: MahdiButcher <madibutchercoding@gmail.com> Co-authored-by: Mahdi Butcher <MahdiButcherCoding@gmail.com>
116 lines
4.3 KiB
Python
116 lines
4.3 KiB
Python
from typing import Optional, Union
|
|
from app.models.admin import AdminInDB, AdminValidationResult, Admin
|
|
from app.models.user import UserResponse, UserStatus
|
|
from app.db import Session, crud, get_db
|
|
from config import SUDOERS
|
|
from fastapi import Depends, HTTPException
|
|
from datetime import datetime, timezone, timedelta
|
|
from app.utils.jwt import get_subscription_payload
|
|
|
|
|
|
def validate_admin(db: Session, username: str, password: str) -> Optional[AdminValidationResult]:
|
|
"""Validate admin credentials with environment variables or database."""
|
|
if SUDOERS.get(username) == password:
|
|
return AdminValidationResult(username=username, is_sudo=True)
|
|
|
|
dbadmin = crud.get_admin(db, username)
|
|
if dbadmin and AdminInDB.model_validate(dbadmin).verify_password(password):
|
|
return AdminValidationResult(username=dbadmin.username, is_sudo=dbadmin.is_sudo)
|
|
|
|
return None
|
|
|
|
|
|
def get_admin_by_username(username: str, db: Session = Depends(get_db)):
|
|
"""Fetch an admin by username from the database."""
|
|
dbadmin = crud.get_admin(db, username)
|
|
if not dbadmin:
|
|
raise HTTPException(status_code=404, detail="Admin not found")
|
|
return dbadmin
|
|
|
|
|
|
def get_dbnode(node_id: int, db: Session = Depends(get_db)):
|
|
"""Fetch a node by its ID from the database, raising a 404 error if not found."""
|
|
dbnode = crud.get_node_by_id(db, node_id)
|
|
if not dbnode:
|
|
raise HTTPException(status_code=404, detail="Node not found")
|
|
return dbnode
|
|
|
|
|
|
def validate_dates(start: Optional[Union[str, datetime]], end: Optional[Union[str, datetime]]) -> (datetime, datetime):
|
|
"""Validate if start and end dates are correct and if end is after start."""
|
|
try:
|
|
if start:
|
|
start_date = start if isinstance(start, datetime) else datetime.fromisoformat(
|
|
start).astimezone(timezone.utc)
|
|
else:
|
|
start_date = datetime.now(timezone.utc) - timedelta(days=30)
|
|
if end:
|
|
end_date = end if isinstance(end, datetime) else datetime.fromisoformat(end).astimezone(timezone.utc)
|
|
if start_date and end_date < start_date:
|
|
raise HTTPException(status_code=400, detail="Start date must be before end date")
|
|
else:
|
|
end_date = datetime.now(timezone.utc)
|
|
|
|
return start_date, end_date
|
|
except ValueError:
|
|
raise HTTPException(status_code=400, detail="Invalid date range or format")
|
|
|
|
|
|
def get_user_template(template_id: int, db: Session = Depends(get_db)):
|
|
"""Fetch a User Template by its ID, raise 404 if not found."""
|
|
dbuser_template = crud.get_user_template(db, template_id)
|
|
if not dbuser_template:
|
|
raise HTTPException(status_code=404, detail="User Template not found")
|
|
return dbuser_template
|
|
|
|
|
|
def get_validated_sub(
|
|
token: str,
|
|
db: Session = Depends(get_db)
|
|
) -> UserResponse:
|
|
sub = get_subscription_payload(token)
|
|
if not sub:
|
|
raise HTTPException(status_code=404, detail="Not Found")
|
|
|
|
dbuser = crud.get_user(db, sub['username'])
|
|
if not dbuser or dbuser.created_at > sub['created_at']:
|
|
raise HTTPException(status_code=404, detail="Not Found")
|
|
|
|
if dbuser.sub_revoked_at and dbuser.sub_revoked_at > sub['created_at']:
|
|
raise HTTPException(status_code=404, detail="Not Found")
|
|
|
|
return dbuser
|
|
|
|
|
|
def get_validated_user(
|
|
username: str,
|
|
admin: Admin = Depends(Admin.get_current),
|
|
db: Session = Depends(get_db)
|
|
) -> UserResponse:
|
|
dbuser = crud.get_user(db, username)
|
|
if not dbuser:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
if not (admin.is_sudo or (dbuser.admin and dbuser.admin.username == admin.username)):
|
|
raise HTTPException(status_code=403, detail="You're not allowed")
|
|
|
|
return dbuser
|
|
|
|
|
|
def get_expired_users_list(db: Session, admin: Admin, expired_after: Optional[datetime] = None,
|
|
expired_before: Optional[datetime] = None):
|
|
expired_before = expired_before or datetime.now(timezone.utc)
|
|
expired_after = expired_after or datetime.min.replace(tzinfo=timezone.utc)
|
|
|
|
dbadmin = crud.get_admin(db, admin.username)
|
|
dbusers = crud.get_users(
|
|
db=db,
|
|
status=[UserStatus.expired, UserStatus.limited],
|
|
admin=dbadmin if not admin.is_sudo else None
|
|
)
|
|
|
|
return [
|
|
u for u in dbusers
|
|
if u.expire and expired_after.timestamp() <= u.expire <= expired_before.timestamp()
|
|
]
|