Coverage for app/ddd/infrastructure/auth/jwt_token.py: 100%
30 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-15 01:44 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-15 01:44 +0000
1import secrets
2from datetime import UTC, datetime, timedelta
3from typing import Any
5import jwt
6from fastapi import HTTPException, status
7from jwt.exceptions import InvalidTokenError
9# keycloak
10# APP_CLIENT_ID = "dev-client"
11# APP_CLIENT_SECRET = "example-secret"
12APP_REDIRECT_URI = 0
13KEYCLOAK_TOKEN_URL = 0
15# 32バイトの秘密鍵を生成
16SECRET_KEY = secrets.token_hex(32) # 開発用途の毎回作成キー
18# SECRET_KEY = os.environ["SECRET_KEY"]
19PUBLIC_KEY = "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAw5L4rVichlm99HrLFNlswTgNxW8fnCr3jDS9FWwNjWIYssYdPHstbamePhGgMZ5Xj/4p34dNxxqWKDafnigfcRMn36KdklN2ohMzBIQWA9kMKg3G94YO3x8f39+L09CqTZhNl1LGGL3FhO5LJG+5bUw2yJTQiCPyEupHE3oIaYY5oBxMLTE4I4G35VQ/4in9Y64vTWPO9u1T+QXMDA8boMS6/pvKJDrln0XXBG6ITr7aLUEVrEQD+3tC20DnSTD4DbjZ5g0C7u+AA+0qZ1uKothirILsUgIHUv/s3OvybFc9FvvNuVe1HVggQQuVkJJ1Bj41WXJV82Tmy6JuKdIadQIDAQAB"
20public_key = "-----BEGIN PUBLIC KEY-----\n" \
21 + PUBLIC_KEY \
22 + "\n-----END PUBLIC KEY-----"
23# ALGORITHM = "RS256" # 共通鍵方式
24ALGORITHM = "HS256" # 秘密鍵方式
25# ACCESS_TOKEN_EXPIRE_MINUTES = 5
27options = {
28 "verify_aud": False
29}
32credentials_exception = HTTPException(
33 status_code=status.HTTP_401_UNAUTHORIZED,
34 detail="Could not validate credentials",
35 headers={"WWW-Authenticate": "Bearer"},
36)
38def create_access_token(data: dict[str, Any], expires_delta: timedelta) -> str:
39 to_encode = data.copy()
40 expire = datetime.now(UTC) + expires_delta
41 # if expires_delta:
42 # expire = datetime.now(UTC) + expires_delta
43 # else:
44 # expire = datetime.now(UTC) + timedelta(minutes=15) # default 15 min
45 to_encode.update({"exp": expire})
46 return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
48# def create_access_token_keycloak(username: str, password: str) -> str:
49# params = {
50# 'client_id': APP_CLIENT_ID,
51# 'client_secret': APP_CLIENT_SECRET,
52# 'grant_type': 'password',
53# 'username': username,
54# 'password': password,
55# }
56# import requests, json
57# from pprint import pprint
58# import ast
59# x = requests.post(KEYCLOAK_TOKEN_URL, params, verify=False).content.decode('utf-8')
60# pprint(json.loads(x))
61# token_response = ast.literal_eval(x)
62# pprint(token_response['id_token'])
63# return token_response
65def get_jwt_data(
66 access_token: str
67 ) -> dict[str, Any]:
68 print(access_token)
69 try:
70 # 公開鍵
71 # payload: dict[str, Any] = jwt.decode(
72 # jwt=access_token,
73 # key=public_key,
74 # algorithms=[ALGORITHM],
76 # # options=options,
77 # # audience="dev-client",
78 # # issuer="http://localhost:38080/auth/realms/dev-realm"
79 # )
80 # 共通鍵
81 payload: dict[str, Any] = jwt.decode(
82 jwt=access_token,
83 key=SECRET_KEY,
84 algorithms=["HS256"],
85 # options=options,
86 # audience="dev-client",
87 # issuer="http://localhost:38080/auth/realms/dev-realm"
88 )
89 print("payload")
90 print(payload)
91 except InvalidTokenError as e:
92 print("error")
93 print(e)
94 raise credentials_exception from e
95 return payload