Практика
Общая задача
Задача: Необходимо организовать защиту приложения со следующими требованиями:
- Авторизация по логину и паролю
- Разделение пользователей на группы
- Администратор (все права)
- Руководитель (просмотр статистики по всем направлениям)
- Руководитель направления (права в пределах направления, нет возможности просмотра других направлений)
- Руководитель предприятия (права в пределах предприятия, нет возможности просмотра других объектов)
- Разделение пользователей по ролям в соответствии с группами
- На backend есть API endpoint и HTML endpoint
- Неавторизованный пользователь имеет доступ к некоторым страницам без авторизации
Реализуем это при помощи декораторов из отдельного модуля. Желательно не хардкодить роли, должно быть внешнее хранилище.
Базовая авторизация
Реализуем следующий процесс авторизации:
Делаем страницу с проверкой, авторизован или нет пользователь. Если пользователь не авторизован, показываем кнопку Авторизация. Если авторизован - кнопку Выход. Кнопки отличаются ссылками и текстом.
Будем использовать модуль python-keycloak
pip install python-keycloak
Адресация серверов
192.168.1.3 web server и ПК, с которого я тестирую работу
192.168.1.195 keycloak server
Настройка keycloak.
Создаем realm для данного эксперимента. Назовем его pythonsimpletest.
В разделе Manage realms - Create
Теперь создаем клиента. Назовем его clientforsimpletest. Clients - Create client
Поскольку этот клиент доверенный и расположен на сервере, то Client authentication включаем,
И тут проявилась первая ошибка. Localhost виден с моего ПК. Поэтому, когда я прописал на сервере keycloak в разделе Root URL localhost - ничего не заработало. Похоже, что необходимо указывать имена/ip доступные с сервера keycloak а не только с браузера на ПК пользователя. Результирующие настройки клиента:
Создаем пользователя и задаем ему пароль.
Python клиент
from fastapi import FastAPI, Depends, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from uvicorn import run
from typing import Optional
from keycloak import KeycloakOpenID
app = FastAPI(docs_url=None, redoc_url=None, openapi_url=None)
KEYCLOAK_URL = "http://192.168.1.195:9090/"
REALM_NAME = "pythonsimpletest"
CLIENT_ID = "clientforsimpletest"
CLIENT_SECRET = "Vix6txRyHwt81KyZIpl7O06CxpWMFtib"
REDIRECT_URI = "http://192.168.1.3:8100/auth/callback"
# Инициализация Keycloak клиента
keycloak_openid = KeycloakOpenID(
server_url=KEYCLOAK_URL,
client_id=CLIENT_ID,
realm_name=REALM_NAME,
client_secret_key=CLIENT_SECRET,
)
async def get_current_user_from_cookie(request: Request) -> Optional[dict]:
"""Извлечение и валидация пользователя из cookie"""
# Получаем токен из cookies
access_token = request.cookies.get("access_token")
if not access_token:
return None
try:
# Проверяем токен через Keycloak
userinfo = keycloak_openid.userinfo(access_token)
return userinfo
except Exception as e:
# Если токен просрочен или невалиден, удаляем его
# Но здесь мы не можем удалить cookie, это нужно делать в ответе
return None
def get_login_url():
"""Генерация URL для входа через Keycloak"""
auth_url = keycloak_openid.auth_url(
redirect_uri=REDIRECT_URI,
scope="openid email profile",
state="random_state_string" # В реальном приложении используйте генерацию случайной строки
)
return auth_url
@app.get("/", response_class=HTMLResponse)
async def simpleauth(request: Request):
user = await get_current_user_from_cookie(request)
if not user:
login_url = get_login_url()
html_content = f'<!DOCTYPE html><html><body><a href="{login_url}"><button>Авторизация</button></a></body></html>'
else:
html_content = '<!DOCTYPE html><html><body><a href="/logout"><button>Выход</button></a></body></html>'
return HTMLResponse(content=html_content, status_code=200)
@app.get("/auth/callback")
async def auth_callback(code: str):
"""Callback URL для обработки ответа от Keycloak после логина"""
try:
# Обмен кода на токены
token_response = keycloak_openid.token(
grant_type="authorization_code",
code=code,
redirect_uri=REDIRECT_URI
)
access_token = token_response.get('access_token')
# Создаем редирект с токеном в заголовке (через установку cookie)
response = RedirectResponse(url="/")
# Устанавливаем токен в cookie (альтернатива Authorization header для браузера)
response.set_cookie(
key="access_token",
value=access_token,
httponly=True, # Защита от XSS
secure=False, # True для HTTPS
samesite="lax"
)
refresh_token = token_response.get('refresh_token')
response.set_cookie(
key="refresh_token",
value=refresh_token,
httponly=True, # Защита от XSS
secure=False, # True для HTTPS
samesite="lax"
)
return response
except Exception as e:
return HTMLResponse(content=f"<h1>Ошибка авторизации: {str(e)}</h1>", status_code=400)
@app.get("/logout")
async def logout(request: Request):
"""Выход из системы"""
refresh_token = request.cookies.get("refresh_token")
# Логаут через python-keycloak
if refresh_token:
try:
keycloak_openid.logout(refresh_token)
except Exception as e:
print(f"Keycloak logout error: {e}")
response = RedirectResponse(url="/")
response.delete_cookie("access_token")
response.delete_cookie("refresh_token")
return response
if __name__ == '__main__':
run(app="simple:app", host='0.0.0.0', port=8100, workers=4, log_level='warning')
Добавление ролей
Теперь добавим scopes в токен. Даже просто добавление списка ролей оказалось той еще задачей. Во многих инструкциях будет сказано, что настройка маппера в Keycloak Admin Console -> ваш realm → Clients → ваш клиент -> Вкладка Mappers. Но это не так. Пример настройки для получения ролей:
- Перейди в Client Scopes: Твой realm → Clients → выбери своего клиента → вкладка Client scopes → кликни на дедикейтед скоп *-dedicated (например, my-client-dedicated).
- Открой вкладку Mappers.
- Нажми Configure a new mapper и выбери By configuration.
- Выбери тип маппера: В выпадающем списке выбери User Realm Role.
- Вот это ключевой момент: использование предустановленного типа, а не создание своего, гарантирует корректную внутреннюю структуру.
- Проверь настройки (они должны заполниться автоматически):
- Name: Оставь realm roles (или любое другое).
- Token Claim Name: Убедись, что здесь realm_access . Это единственное правильное имя поля.
- Add to userinfo: Убедись, что переключатель установлен в ON.
- Add to access token / Add to ID token: Можно оставить OFF, если роли не нужны в самих JWT токенах.
- Сохрани маппер (кнопка Save).
async def get_current_user_roles(request: Request) -> Optional[dict]:
"""Извлечение ролей"""
access_token = request.cookies.get("access_token")
if not access_token:
return []
try:
claims = jwt.get_unverified_claims(access_token)
roles = claims.get("realm_access", {}).get("roles", [])
return roles
except Exception:
return []
...
@app.get("/", response_class=HTMLResponse)
async def simpleauth(request: Request):
user = await get_current_user_from_cookie(request)
if not user:
login_url = get_login_url()
html_content = f'<!DOCTYPE html><html><body><a href="{login_url}"><button>Авторизация</button></a></body></html>'
else:
roles = await get_current_user_roles(request)
html_content = f'''<!DOCTYPE html><html><body><a href="/logout"><button>Выход</button></a>
<p>Доступные ключи: {user.keys()}</p>
<p>Доступные роли: {roles}</p>
</body></html>'''
return HTMLResponse(content=html_content, status_code=200)
Добавление декораторов
Почему-то для fastapi предпочтительнее использовать механизм Depends. Однако мне проще декораторы.
from functools import wraps
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.concurrency import run_in_threadpool
from uvicorn import run
from typing import Optional
from keycloak import KeycloakOpenID
from jose import jwt
from jose.exceptions import JWTError, ExpiredSignatureError
app = FastAPI(docs_url=None, redoc_url=None, openapi_url=None)
KEYCLOAK_URL = "http://192.168.1.195:9090/"
REALM_NAME = "pythonsimpletest"
CLIENT_ID = "clientforsimpletest"
CLIENT_SECRET = "Vix6txRyHwt81KyZIpl7O06CxpWMFtib"
REDIRECT_URI = "http://192.168.1.3:8100/auth/callback"
KEYCLOAK_PUBLIC_KEY = """-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAtrnsNk3z21imvfNFDT8teaekNgFhnOSuKcZiPc1Iv+qknDg8X1zF7KJLtZMrCZSLRJM6T754F8KtNUSj0Ioa3ehYSPEdF4SX7tBdacpeDz0GlKWYFa845/e6H2349I+7EuO1bkHfWz/v6n2mm+jeZKU0ujqr2boBoPWwkMoKwXNMl/Sac0YMlv1fVHiISyREDG+FordAwYlbLVYCD36ckk0UnAKBc59Q36DMiKSx7JpdvR0vHIeWb4mQFlRbLdmWkK4ebUe+B/k1/cdd3LHdQ6vc1i45bMcJlrFYocDOGK99Mf8pT8OMPQvTJ8cTe9xSkCAx0l7YiSv+5hfK1C6DswIDAQAB
-----END PUBLIC KEY-----"""
ALGORITHMS = ["RS256"]
# Инициализация Keycloak клиента
keycloak_openid = KeycloakOpenID(
server_url=KEYCLOAK_URL,
client_id=CLIENT_ID,
realm_name=REALM_NAME,
client_secret_key=CLIENT_SECRET,
)
# ==============================Декораторы авторизации=========================
def verify_token(token: str):
return jwt.decode(
token,
KEYCLOAK_PUBLIC_KEY,
algorithms=ALGORITHMS,
audience="account",
options={
"verify_signature": True,
"verify_aud": False,
"verify_exp": True,
}
)
def get_login_url():
"""Генерация URL для входа через Keycloak"""
auth_url = keycloak_openid.auth_url(
redirect_uri=REDIRECT_URI,
scope="openid email profile",
state="random_state_string" # В реальном приложении используйте генерацию случайной строки
)
return auth_url
def session_required(func):
@wraps(func)
async def wrapper(*args, **kwargs):
request: Request = kwargs.get("request")
if request is None:
for arg in args:
if isinstance(arg, Request):
request = arg
break
if request is None:
raise RuntimeError("Request object not found")
token = request.cookies.get("access_token")
if not token:
return RedirectResponse(get_login_url())
try:
claims = verify_token(token)
request.state.user = claims
except ExpiredSignatureError:
response = RedirectResponse(get_login_url())
response.delete_cookie("access_token")
response.delete_cookie("refresh_token")
return response
except JWTError:
response = RedirectResponse(get_login_url())
response.delete_cookie("access_token")
response.delete_cookie("refresh_token")
return response
return await func(*args, **kwargs)
return wrapper
def html_norole():
html_content = f'''<!DOCTYPE html><html><body><a href="/"><button>На главную</button></a>
<p><a href="/logout"><button>Выход</button></a></p>
<p>У вас нет доступа к этой странице</p>
</body></html>'''
return HTMLResponse(content=html_content, status_code=200)
def role_secondrole_required(func):
@wraps(func)
async def wrapper(*args, **kwargs):
request: Request = kwargs.get("request")
if request is None:
for arg in args:
if isinstance(arg, Request):
request = arg
break
if request is None:
raise RuntimeError("Request object not found")
userinfo = request.state.user
roles = userinfo.get("realm_access", {}).get("roles", [])
if 'secondrole' not in roles:
return html_norole()
return await func(*args, **kwargs)
return wrapper
# =============================================================================
@app.get("/", response_class=HTMLResponse)
@session_required
async def simpleauth(request: Request):
userinfo = request.state.user #await get_current_user_from_cookie(request)
roles = userinfo.get("realm_access", {}).get("roles", [])
html_content = f'''<!DOCTYPE html><html><body><a href="/logout"><button>Выход</button></a>
<p>Доступные ключи: {userinfo}</p>
<p>Доступные роли: {roles}</p>
<p><a href="/secret"><button>Доступ к секрету</button></a></p>
</body></html>'''
return HTMLResponse(content=html_content, status_code=200)
@app.get("/secret", response_class=HTMLResponse)
@session_required
@role_secondrole_required
async def roleneeded(request: Request):
html_content = f'''<!DOCTYPE html><html><body><a href="/logout"><button>Выход</button></a>
<p><a href="/"><button>На домашнюю страницу</button></a></p>
</body></html>'''
return HTMLResponse(content=html_content, status_code=200)
@app.get("/auth/callback")
async def auth_callback(code: str):
"""Callback URL для обработки ответа от Keycloak после логина"""
def append_cookie(response, key, value):
response.set_cookie(
key=key,
value=value,
httponly=True, # Защита от XSS
secure=False, # True для HTTPS
samesite="lax"
)
return response
try:
# Обмен кода на токены
token_response = keycloak_openid.token(
grant_type="authorization_code",
code=code,
redirect_uri=REDIRECT_URI
)
# Создаем редирект с токеном в заголовке (через установку cookie)
response = RedirectResponse(url="/")
access_token = token_response.get('access_token')
# Устанавливаем токен в cookie (альтернатива Authorization header для браузера)
response = append_cookie(response, "access_token", access_token)
refresh_token = token_response.get('refresh_token')
response = append_cookie(response, "refresh_token", refresh_token)
return response
except Exception as e:
return HTMLResponse(content=f"<h1>Ошибка авторизации: {str(e)}</h1>", status_code=400)
@app.get("/logout")
async def logout(request: Request):
"""Выход из системы"""
refresh_token = request.cookies.get("refresh_token")
if refresh_token:
try:
keycloak_openid.logout(refresh_token)
except Exception as e:
print(f"Keycloak logout error: {e}")
response = RedirectResponse(url="/")
response.delete_cookie("access_token")
response.delete_cookie("refresh_token")
return response
if __name__ == '__main__':
run(app="02_withroles:app", host='0.0.0.0', port=8100, workers=4, log_level='warning')