Files
Marzban/app/models/user.py
Mohammad 1c26cd1dcd fix: float bug when creating users and showing float data (#1541)
* fix: float bug when creating users and showing float data

* update pydantic

* fix: cast reseted_usage and lifetime_used_traffic to int
2024-12-25 12:43:42 +03:30

367 lines
12 KiB
Python

import re
import secrets
from datetime import datetime
from enum import Enum
from typing import Dict, List, Optional, Union
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
from app import xray
from app.models.admin import Admin
from app.models.proxy import ProxySettings, ProxyTypes
from app.subscription.share import generate_v2ray_links
from app.utils.jwt import create_subscription_token
from config import XRAY_SUBSCRIPTION_PATH, XRAY_SUBSCRIPTION_URL_PREFIX
USERNAME_REGEXP = re.compile(r"^(?=\w{3,32}\b)[a-zA-Z0-9-_@.]+(?:_[a-zA-Z0-9-_@.]+)*$")
class ReminderType(str, Enum):
expiration_date = "expiration_date"
data_usage = "data_usage"
class UserStatus(str, Enum):
active = "active"
disabled = "disabled"
limited = "limited"
expired = "expired"
on_hold = "on_hold"
class UserStatusModify(str, Enum):
active = "active"
disabled = "disabled"
on_hold = "on_hold"
class UserStatusCreate(str, Enum):
active = "active"
on_hold = "on_hold"
class UserDataLimitResetStrategy(str, Enum):
no_reset = "no_reset"
day = "day"
week = "week"
month = "month"
year = "year"
class NextPlanModel(BaseModel):
data_limit: Optional[int] = None
expire: Optional[int] = None
add_remaining_traffic: bool = False
fire_on_either: bool = True
model_config = ConfigDict(from_attributes=True)
class User(BaseModel):
proxies: Dict[ProxyTypes, ProxySettings] = {}
expire: Optional[int] = Field(None, nullable=True)
data_limit: Optional[int] = Field(
ge=0, default=None, description="data_limit can be 0 or greater"
)
data_limit_reset_strategy: UserDataLimitResetStrategy = (
UserDataLimitResetStrategy.no_reset
)
inbounds: Dict[ProxyTypes, List[str]] = {}
note: Optional[str] = Field(None, nullable=True)
sub_updated_at: Optional[datetime] = Field(None, nullable=True)
sub_last_user_agent: Optional[str] = Field(None, nullable=True)
online_at: Optional[datetime] = Field(None, nullable=True)
on_hold_expire_duration: Optional[int] = Field(None, nullable=True)
on_hold_timeout: Optional[Union[datetime, None]] = Field(None, nullable=True)
auto_delete_in_days: Optional[int] = Field(None, nullable=True)
next_plan: Optional[NextPlanModel] = Field(None, nullable=True)
@field_validator('data_limit', mode='before')
def cast_to_int(cls, v):
if v is None: # Allow None values
return v
if isinstance(v, float): # Allow float to int conversion
return int(v)
if isinstance(v, int): # Allow integers directly
return v
raise ValueError("data_limit must be an integer or a float, not a string") # Reject strings
@field_validator("proxies", mode="before")
def validate_proxies(cls, v, values, **kwargs):
if not v:
raise ValueError("Each user needs at least one proxy")
return {
proxy_type: ProxySettings.from_dict(
proxy_type, v.get(proxy_type, {}))
for proxy_type in v
}
@field_validator("username", check_fields=False)
@classmethod
def validate_username(cls, v):
if not USERNAME_REGEXP.match(v):
raise ValueError(
"Username only can be 3 to 32 characters and contain a-z, 0-9, and underscores in between."
)
return v
@field_validator("note", check_fields=False)
@classmethod
def validate_note(cls, v):
if v and len(v) > 500:
raise ValueError("User's note can be a maximum of 500 character")
return v
@field_validator("on_hold_expire_duration", "on_hold_timeout", mode="before")
def validate_timeout(cls, v, values):
# Check if expire is 0 or None and timeout is not 0 or None
if (v in (0, None)):
return None
return v
class UserCreate(User):
username: str
status: UserStatusCreate = None
model_config = ConfigDict(json_schema_extra={
"example": {
"username": "user1234",
"proxies": {
"vmess": {"id": "35e4e39c-7d5c-4f4b-8b71-558e4f37ff53"},
"vless": {},
},
"inbounds": {
"vmess": ["VMess TCP", "VMess Websocket"],
"vless": ["VLESS TCP REALITY", "VLESS GRPC REALITY"],
},
"next_plan": {
"data_limit": 0,
"expire": 0,
"add_remaining_traffic": False,
"fire_on_either": True
},
"expire": 0,
"data_limit": 0,
"data_limit_reset_strategy": "no_reset",
"status": "active",
"note": "",
"on_hold_timeout": "2023-11-03T20:30:00",
"on_hold_expire_duration": 0,
}
})
@property
def excluded_inbounds(self):
excluded = {}
for proxy_type in self.proxies:
excluded[proxy_type] = []
for inbound in xray.config.inbounds_by_protocol.get(proxy_type, []):
if not inbound["tag"] in self.inbounds.get(proxy_type, []):
excluded[proxy_type].append(inbound["tag"])
return excluded
@field_validator("inbounds", mode="before")
def validate_inbounds(cls, inbounds, values, **kwargs):
proxies = values.data.get("proxies", [])
# delete inbounds that are for protocols not activated
for proxy_type in inbounds.copy():
if proxy_type not in proxies:
del inbounds[proxy_type]
# check by proxies to ensure that every protocol has inbounds set
for proxy_type in proxies:
tags = inbounds.get(proxy_type)
if tags:
for tag in tags:
if tag not in xray.config.inbounds_by_tag:
raise ValueError(f"Inbound {tag} doesn't exist")
# elif isinstance(tags, list) and not tags:
# raise ValueError(f"{proxy_type} inbounds cannot be empty")
else:
inbounds[proxy_type] = [
i["tag"]
for i in xray.config.inbounds_by_protocol.get(proxy_type, [])
]
return inbounds
@field_validator("status", mode="before")
def validate_status(cls, status, values):
on_hold_expire = values.data.get("on_hold_expire_duration")
expire = values.data.get("expire")
if status == UserStatusCreate.on_hold:
if (on_hold_expire == 0 or on_hold_expire is None):
raise ValueError("User cannot be on hold without a valid on_hold_expire_duration.")
if expire:
raise ValueError("User cannot be on hold with specified expire.")
return status
class UserModify(User):
status: UserStatusModify = None
data_limit_reset_strategy: UserDataLimitResetStrategy = None
model_config = ConfigDict(json_schema_extra={
"example": {
"proxies": {
"vmess": {"id": "35e4e39c-7d5c-4f4b-8b71-558e4f37ff53"},
"vless": {},
},
"inbounds": {
"vmess": ["VMess TCP", "VMess Websocket"],
"vless": ["VLESS TCP REALITY", "VLESS GRPC REALITY"],
},
"next_plan": {
"data_limit": 0,
"expire": 0,
"add_remaining_traffic": False,
"fire_on_either": True
},
"expire": 0,
"data_limit": 0,
"data_limit_reset_strategy": "no_reset",
"status": "active",
"note": "",
"on_hold_timeout": "2023-11-03T20:30:00",
"on_hold_expire_duration": 0,
}
})
@property
def excluded_inbounds(self):
excluded = {}
for proxy_type in self.inbounds:
excluded[proxy_type] = []
for inbound in xray.config.inbounds_by_protocol.get(proxy_type, []):
if not inbound["tag"] in self.inbounds.get(proxy_type, []):
excluded[proxy_type].append(inbound["tag"])
return excluded
@field_validator("inbounds", mode="before")
def validate_inbounds(cls, inbounds, values, **kwargs):
# check with inbounds, "proxies" is optional on modifying
# so inbounds particularly can be modified
if inbounds:
for proxy_type, tags in inbounds.items():
# if not tags:
# raise ValueError(f"{proxy_type} inbounds cannot be empty")
for tag in tags:
if tag not in xray.config.inbounds_by_tag:
raise ValueError(f"Inbound {tag} doesn't exist")
return inbounds
@field_validator("proxies", mode="before")
def validate_proxies(cls, v):
return {
proxy_type: ProxySettings.from_dict(
proxy_type, v.get(proxy_type, {}))
for proxy_type in v
}
@field_validator("status", mode="before")
def validate_status(cls, status, values):
on_hold_expire = values.data.get("on_hold_expire_duration")
expire = values.data.get("expire")
if status == UserStatusCreate.on_hold:
if (on_hold_expire == 0 or on_hold_expire is None):
raise ValueError("User cannot be on hold without a valid on_hold_expire_duration.")
if expire:
raise ValueError("User cannot be on hold with specified expire.")
return status
class UserResponse(User):
username: str
status: UserStatus
used_traffic: int
lifetime_used_traffic: int = 0
created_at: datetime
links: List[str] = []
subscription_url: str = ""
proxies: dict
excluded_inbounds: Dict[ProxyTypes, List[str]] = {}
admin: Optional[Admin] = None
model_config = ConfigDict(from_attributes=True)
@model_validator(mode="after")
def validate_links(self):
if not self.links:
self.links = generate_v2ray_links(
self.proxies, self.inbounds, extra_data=self.model_dump(), reverse=False,
)
return self
@model_validator(mode="after")
def validate_subscription_url(self):
if not self.subscription_url:
salt = secrets.token_hex(8)
url_prefix = (XRAY_SUBSCRIPTION_URL_PREFIX).replace('*', salt)
token = create_subscription_token(self.username)
self.subscription_url = f"{url_prefix}/{XRAY_SUBSCRIPTION_PATH}/{token}"
return self
@field_validator("proxies", mode="before")
def validate_proxies(cls, v, values, **kwargs):
if isinstance(v, list):
v = {p.type: p.settings for p in v}
return super().validate_proxies(v, values, **kwargs)
@field_validator("used_traffic", "lifetime_used_traffic", mode='before')
def cast_to_int(cls, v):
if v is None: # Allow None values
return v
if isinstance(v, float): # Allow float to int conversion
return int(v)
if isinstance(v, int): # Allow integers directly
return v
raise ValueError("must be an integer or a float, not a string") # Reject strings
class SubscriptionUserResponse(UserResponse):
admin: Admin | None = Field(default=None, exclude=True)
excluded_inbounds: Dict[ProxyTypes, List[str]] | None = Field(None, exclude=True)
note: str | None = Field(None, exclude=True)
inbounds: Dict[ProxyTypes, List[str]] | None = Field(None, exclude=True)
auto_delete_in_days: int | None = Field(None, exclude=True)
model_config = ConfigDict(from_attributes=True)
class UsersResponse(BaseModel):
users: List[UserResponse]
total: int
class UserUsageResponse(BaseModel):
node_id: Union[int, None] = None
node_name: str
used_traffic: int
@field_validator("used_traffic", mode='before')
def cast_to_int(cls, v):
if v is None: # Allow None values
return v
if isinstance(v, float): # Allow float to int conversion
return int(v)
if isinstance(v, int): # Allow integers directly
return v
raise ValueError("must be an integer or a float, not a string") # Reject strings
class UserUsagesResponse(BaseModel):
username: str
usages: List[UserUsageResponse]
class UsersUsagesResponse(BaseModel):
usages: List[UserUsageResponse]