68 lines
2.0 KiB
Python
68 lines
2.0 KiB
Python
import secrets
|
|
from datetime import datetime
|
|
from typing import Optional, Union
|
|
|
|
from async_property import async_property
|
|
|
|
from cpl.auth.permission.permissions import Permissions
|
|
from cpl.core.environment.environment import Environment
|
|
from cpl.core.log.logger import Logger
|
|
from cpl.core.typing import Id, SerialId
|
|
from cpl.core.utils.credential_manager import CredentialManager
|
|
from cpl.database.abc.db_model_abc import DbModelABC
|
|
from cpl.dependency import get_provider
|
|
from cpl.dependency.service_provider import ServiceProvider
|
|
|
|
_logger = Logger(__name__)
|
|
|
|
|
|
class ApiKey(DbModelABC):
|
|
|
|
def __init__(
|
|
self,
|
|
id: SerialId,
|
|
identifier: str,
|
|
key: Union[str, bytes],
|
|
deleted: bool = False,
|
|
editor_id: Optional[Id] = None,
|
|
created: Optional[datetime] = None,
|
|
updated: Optional[datetime] = None,
|
|
):
|
|
DbModelABC.__init__(self, id, deleted, editor_id, created, updated)
|
|
self._identifier = identifier
|
|
self._key = key
|
|
|
|
@property
|
|
def identifier(self) -> str:
|
|
return self._identifier
|
|
|
|
@property
|
|
def key(self) -> str:
|
|
return self._key
|
|
|
|
@property
|
|
def plain_key(self) -> str:
|
|
return CredentialManager.decrypt(self.key)
|
|
|
|
@async_property
|
|
async def permissions(self):
|
|
from cpl.auth.schema._permission.api_key_permission_dao import ApiKeyPermissionDao
|
|
|
|
apiKeyPermissionDao = get_provider().get_service(ApiKeyPermissionDao)
|
|
|
|
return [await x.permission for x in await apiKeyPermissionDao.find_by_api_key_id(self.id)]
|
|
|
|
async def has_permission(self, permission: Permissions) -> bool:
|
|
return permission.value in [x.name for x in await self.permissions]
|
|
|
|
def set_new_api_key(self):
|
|
self._key = self.new_key()
|
|
|
|
@staticmethod
|
|
def new_key() -> str:
|
|
return CredentialManager.encrypt(f"api_{secrets.token_urlsafe(Environment.get("API_KEY_LENGTH", int, 64))}")
|
|
|
|
@classmethod
|
|
def new(cls, identifier: str) -> "ApiKey":
|
|
return ApiKey(0, identifier, cls.new_key())
|