apeters 1d204f26b8 pre-Korves.Net
Signed-off-by: apeters <apeters@korves.net>
2025-05-21 08:05:07 +00:00

265 lines
7.8 KiB
Python

import random
import json
from config.defaults import USER_ACLS, ACCESS_TOKEN_FORMAT, ACCEPT_LANGUAGES
from components.utils import ensure_list, to_unique_sorted_str_list
from components.utils.datetimes import ntime_utc_now, utc_now_as_str
from webauthn.helpers.structs import AuthenticatorTransport
from components.models import *
class TokenConfirmation(BaseModel):
confirmation_code: Annotated[int, AfterValidator(lambda i: "%06d" % i)]
token: str = constr(strip_whitespace=True, min_length=14, max_length=14)
class AuthToken(BaseModel):
login: str = constr(strip_whitespace=True, min_length=1)
@computed_field
@property
def token(self) -> str:
return "%04d-%04d-%04d" % (
random.randint(0, 9999),
random.randint(0, 9999),
random.randint(0, 9999),
)
class Credential(BaseModel):
id: Annotated[str, AfterValidator(lambda x: bytes.fromhex(x))] | bytes
public_key: Annotated[str, AfterValidator(lambda x: bytes.fromhex(x))] | bytes
friendly_name: constr(strip_whitespace=True, min_length=1)
last_login: str
sign_count: int
transports: list[AuthenticatorTransport] | None = []
active: bool
updated: str
created: str
@field_serializer("id", "public_key")
def serialize_bytes_to_hex(self, v: bytes, _info):
return v.hex() if isinstance(v, bytes) else v
class AddCredential(BaseModel):
id: Annotated[bytes, AfterValidator(lambda x: x.hex())] | str
public_key: Annotated[bytes, AfterValidator(lambda x: x.hex())] | str
sign_count: int
friendly_name: str = "New passkey"
transports: list[AuthenticatorTransport] | None = []
active: bool = True
last_login: str = ""
@computed_field
@property
def created(self) -> str:
return utc_now_as_str()
@computed_field
@property
def updated(self) -> str:
return utc_now_as_str()
class UserProfile(BaseModel):
model_config = ConfigDict(validate_assignment=True)
@field_validator("email", mode="before")
def email_validator(cls, v):
if v in [None, ""]:
return ""
try:
email = validate_email(v, check_deliverability=False).ascii_email
except:
raise PydanticCustomError(
"email_invalid",
"The provided email address is invalid",
dict(provided_email=v),
)
return email
@field_validator("access_tokens", mode="after")
def access_tokens_validator(cls, v):
for s in v:
try:
TypeAdapter(ACCESS_TOKEN_FORMAT).validate_python(s)
except ValidationError as e:
s_priv = s[:3] + (s[3:] and "***")
raise PydanticCustomError(
"access_tokens",
f"The provided token {s_priv} is invalid",
dict(),
)
return v
@field_validator("tresor", mode="after")
def parse_tresor(cls, v):
try:
if v:
tresor = json.loads(v)
assert all(
k in tresor.keys()
for k in ["public_key_pem", "wrapped_private_key", "iv", "salt"]
)
return v
except:
raise PydanticCustomError(
"tresor",
f"The provided tresor data is invalid",
dict(),
)
tresor: str | None = Field(
default=None,
json_schema_extra={
"title": "Personal tresor",
"type": "tresor",
"input_extra": 'autocomplete="off" autocorrect="off" autocapitalize="off" spellcheck="false"',
"form_id": f"tresor-{str(uuid4())}",
},
)
email: str = Field(
default="",
json_schema_extra={
"title": "Email address",
"description": "Your email address is optional",
"type": "email",
"input_extra": 'autocomplete="off" autocorrect="off" autocapitalize="off" spellcheck="false"',
"form_id": f"email-{str(uuid4())}",
},
)
access_tokens: Annotated[
str | list,
AfterValidator(lambda x: to_unique_sorted_str_list(ensure_list(x))),
] = Field(
default=[],
json_schema_extra={
"title": "Access tokens",
"description": "Tokens to access the API. Save profile after removing a token.",
"type": "list:text",
"input_extra": 'autocomplete="off" autocorrect="off" autocapitalize="off" spellcheck="false"',
"form_id": f"access-tokens-{str(uuid4())}",
},
)
permit_auth_requests: Annotated[
Literal[True, False],
BeforeValidator(lambda x: True if str(x).lower() == "true" else False),
AfterValidator(lambda x: True if str(x).lower() == "true" else False),
] = Field(
default=True,
json_schema_extra={
"title": "Authentication requests",
"description": "Allow other devices to issue authentication requests to active sessions via pop-up",
"type": "radio",
"input_extra": 'autocomplete="off"',
"form_id": f"proxy-login-{str(uuid4())}",
},
)
updated: str | None = None
class User(BaseModel):
model_config = ConfigDict(validate_assignment=True)
id: Annotated[str, AfterValidator(lambda v: str(UUID(v)))]
login: constr(strip_whitespace=True, min_length=1)
credentials: dict[str, Credential] | Annotated[
str | list[str],
AfterValidator(lambda v: ensure_list(v)),
] = {}
acl: Annotated[
Literal[*USER_ACLS] | list[Literal[*USER_ACLS]],
AfterValidator(lambda v: ensure_list(v)),
]
groups: Annotated[
constr(strip_whitespace=True, min_length=1)
| list[constr(strip_whitespace=True, min_length=1)],
AfterValidator(lambda v: ensure_list(v)),
] = []
profile: UserProfile
created: str
updated: str
class UserGroups(BaseModel):
name: constr(strip_whitespace=True, min_length=1)
new_name: constr(strip_whitespace=True, min_length=1)
members: Annotated[
str | list,
AfterValidator(lambda x: to_unique_sorted_str_list(ensure_list(x))),
] = []
class UserAdd(BaseModel):
login: str = constr(strip_whitespace=True, min_length=1)
credentials: list[str] = []
acl: list[Literal[*USER_ACLS]] = ["user"]
profile: UserProfile = UserProfile.parse_obj({})
groups: list[constr(strip_whitespace=True, min_length=1)] = []
@computed_field
@property
def id(self) -> str:
return str(uuid4())
@computed_field
@property
def created(self) -> str:
return utc_now_as_str()
@computed_field
@property
def updated(self) -> str:
return utc_now_as_str()
class UserPatch(BaseModel):
login: str | None = None
acl: str | list = []
credentials: str | list = []
groups: str | list | None = None
@computed_field
@property
def updated(self) -> str:
return utc_now_as_str()
class UserProfilePatch(BaseModel):
email: str | None = None
tresor: str | None = None
access_tokens: str | list = []
permit_auth_requests: bool | None = None
@computed_field
@property
def updated(self) -> str:
return utc_now_as_str()
class CredentialPatch(BaseModel):
friendly_name: str | None = None
active: bool | None = None
last_login: str | None = None
sign_count: int | None = None
@computed_field
@property
def updated(self) -> str:
return utc_now_as_str()
class UserSession(BaseModel):
id: str
login: str
acl: list | str
cred_id: str | None = None
lang: Literal[*ACCEPT_LANGUAGES] = "en"
profile: dict | UserProfile | None = {}
login_ts: float = Field(default_factory=ntime_utc_now)