import random import json from config.defaults import USER_ACLS, ACCESS_TOKEN_FORMAT, ACCEPT_LANGUAGES from components.utils import ensure_list, to_unique_sorted_str_list from components.utils.datetimes import ntime_utc_now, utc_now_as_str from webauthn.helpers.structs import AuthenticatorTransport from components.models import * class TokenConfirmation(BaseModel): confirmation_code: Annotated[int, AfterValidator(lambda i: "%06d" % i)] token: str = constr(strip_whitespace=True, min_length=14, max_length=14) class AuthToken(BaseModel): login: str = constr(strip_whitespace=True, min_length=1) @computed_field @property def token(self) -> str: return "%04d-%04d-%04d" % ( random.randint(0, 9999), random.randint(0, 9999), random.randint(0, 9999), ) class UserProfile(BaseModel): model_config = ConfigDict(validate_assignment=True) @field_validator("email", mode="before") def email_validator(cls, v): if v in [None, ""]: return "" try: email = validate_email(v, check_deliverability=False).ascii_email except: raise PydanticCustomError( "email_invalid", "The provided email address is invalid", dict(provided_email=v), ) return email @field_validator("access_tokens", mode="after") def access_tokens_validator(cls, v): for s in v: try: TypeAdapter(ACCESS_TOKEN_FORMAT).validate_python(s) except ValidationError as e: s_priv = s[:3] + (s[3:] and "***") raise PydanticCustomError( "access_tokens", f"The provided token {s_priv} is invalid", dict(), ) return v @field_validator("tresor", mode="after") def parse_tresor(cls, v): try: if v: tresor = json.loads(v) assert all( k in tresor.keys() for k in ["public_key_pem", "wrapped_private_key", "iv", "salt"] ) else: v = None return v except: raise PydanticCustomError( "tresor", f"The provided tresor data is invalid", dict(), ) tresor: str | None = Field( default=None, json_schema_extra={ "title": "🔐 Personal tresor", "type": "tresor", "input_extra": 'autocomplete="off" autocorrect="off" autocapitalize="off" spellcheck="false"', "form_id": f"tresor-{str(uuid4())}", }, ) email: str = Field( default="", json_schema_extra={ "title": "Email address", "description": "Your email address is optional", "type": "email", "input_extra": 'autocomplete="off" autocorrect="off" autocapitalize="off" spellcheck="false"', "form_id": f"email-{str(uuid4())}", }, ) access_tokens: Annotated[ str | list, AfterValidator(lambda x: to_unique_sorted_str_list(ensure_list(x))), ] = Field( default=[], json_schema_extra={ "title": "Access tokens", "description": "Tokens to access the API. Save profile after removing a token.", "type": "list:text", "input_extra": 'autocomplete="off" autocorrect="off" autocapitalize="off" spellcheck="false"', "form_id": f"access-tokens-{str(uuid4())}", }, ) permit_auth_requests: Annotated[ Literal[True, False], BeforeValidator(lambda x: True if str(x).lower() == "true" else False), AfterValidator(lambda x: True if str(x).lower() == "true" else False), ] = Field( default=True, json_schema_extra={ "title": "Authentication requests", "description": "Allow other devices to issue authentication requests to active sessions via pop-up", "type": "radio", "input_extra": 'autocomplete="off"', "form_id": f"proxy-login-{str(uuid4())}", }, ) updated: str | None = None class Credential(BaseModel): id: Annotated[str, AfterValidator(lambda x: bytes.fromhex(x))] | bytes public_key: Annotated[str, AfterValidator(lambda x: bytes.fromhex(x))] | bytes friendly_name: constr(strip_whitespace=True, min_length=1) last_login: str sign_count: int transports: list[AuthenticatorTransport] | None = [] active: bool updated: str created: str @field_serializer("id", "public_key") def serialize_bytes_to_hex(self, v: bytes, _info): return v.hex() if isinstance(v, bytes) else v class CredentialAdd(BaseModel): id: Annotated[str, AfterValidator(lambda x: bytes.fromhex(x))] | bytes public_key: Annotated[str, AfterValidator(lambda x: bytes.fromhex(x))] | bytes sign_count: int friendly_name: constr(strip_whitespace=True, min_length=1) = "New passkey" transports: list[AuthenticatorTransport] | None = [] active: bool = True last_login: str = "" @computed_field @property def created(self) -> str: return utc_now_as_str() @computed_field @property def updated(self) -> str: return utc_now_as_str() @field_serializer("id", "public_key") def serialize_bytes_to_hex(self, v: bytes, _info): return v.hex() if isinstance(v, bytes) else v class User(BaseModel): model_config = ConfigDict(validate_assignment=True) id: Annotated[str, AfterValidator(lambda v: str(UUID(v)))] login: constr(strip_whitespace=True, min_length=1) credentials: list[Credential | CredentialAdd] = [] acl: Annotated[ Literal[*USER_ACLS] | list[Literal[*USER_ACLS]], AfterValidator(lambda v: ensure_list(v)), ] groups: Annotated[ constr(strip_whitespace=True, min_length=1) | list[constr(strip_whitespace=True, min_length=1)], AfterValidator(lambda v: ensure_list(v)), ] = [] profile: UserProfile created: str updated: str class UserGroups(BaseModel): name: constr(strip_whitespace=True, min_length=1) new_name: constr(strip_whitespace=True, min_length=1) members: Annotated[ str | list, AfterValidator(lambda x: to_unique_sorted_str_list(ensure_list(x))), ] = [] class UserAdd(BaseModel): login: str = constr(strip_whitespace=True, min_length=1) credentials: list[str] = [] acl: list[Literal[*USER_ACLS]] = ["user"] profile: UserProfile = UserProfile.parse_obj({}) groups: list[constr(strip_whitespace=True, min_length=1)] = [] @computed_field @property def id(self) -> str: return str(uuid4()) @computed_field @property def created(self) -> str: return utc_now_as_str() @computed_field @property def updated(self) -> str: return utc_now_as_str() class UserPatch(BaseModel): login: str | None = None acl: str | list = [] groups: str | list | None = None @computed_field @property def updated(self) -> str: return utc_now_as_str() class UserProfilePatch(BaseModel): email: str | None = None tresor: str | None = None access_tokens: str | list = [] permit_auth_requests: bool | None = None @computed_field @property def updated(self) -> str: return utc_now_as_str() class CredentialPatch(BaseModel): friendly_name: str | None = None active: bool | None = None last_login: str | None = None sign_count: int | None = None @computed_field @property def updated(self) -> str: return utc_now_as_str() class UserSession(BaseModel): id: str login: str acl: list | str cred_id: str | None = None lang: Literal[*ACCEPT_LANGUAGES] = "en" profile: dict | UserProfile | None = {} login_ts: float = Field(default_factory=ntime_utc_now)