Coverage for app/ddd/infrastructure/auth/jwt_token.py: 100%

30 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-15 01:44 +0000

1import secrets 

2from datetime import UTC, datetime, timedelta 

3from typing import Any 

4 

5import jwt 

6from fastapi import HTTPException, status 

7from jwt.exceptions import InvalidTokenError 

8 

9# keycloak 

10# APP_CLIENT_ID = "dev-client" 

11# APP_CLIENT_SECRET = "example-secret" 

12APP_REDIRECT_URI = 0 

13KEYCLOAK_TOKEN_URL = 0 

14 

15# 32バイトの秘密鍵を生成 

16SECRET_KEY = secrets.token_hex(32) # 開発用途の毎回作成キー 

17 

18# SECRET_KEY = os.environ["SECRET_KEY"] 

19PUBLIC_KEY = "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAw5L4rVichlm99HrLFNlswTgNxW8fnCr3jDS9FWwNjWIYssYdPHstbamePhGgMZ5Xj/4p34dNxxqWKDafnigfcRMn36KdklN2ohMzBIQWA9kMKg3G94YO3x8f39+L09CqTZhNl1LGGL3FhO5LJG+5bUw2yJTQiCPyEupHE3oIaYY5oBxMLTE4I4G35VQ/4in9Y64vTWPO9u1T+QXMDA8boMS6/pvKJDrln0XXBG6ITr7aLUEVrEQD+3tC20DnSTD4DbjZ5g0C7u+AA+0qZ1uKothirILsUgIHUv/s3OvybFc9FvvNuVe1HVggQQuVkJJ1Bj41WXJV82Tmy6JuKdIadQIDAQAB" 

20public_key = "-----BEGIN PUBLIC KEY-----\n" \ 

21 + PUBLIC_KEY \ 

22 + "\n-----END PUBLIC KEY-----" 

23# ALGORITHM = "RS256" # 共通鍵方式 

24ALGORITHM = "HS256" # 秘密鍵方式 

25# ACCESS_TOKEN_EXPIRE_MINUTES = 5 

26 

27options = { 

28 "verify_aud": False 

29} 

30 

31 

32credentials_exception = HTTPException( 

33 status_code=status.HTTP_401_UNAUTHORIZED, 

34 detail="Could not validate credentials", 

35 headers={"WWW-Authenticate": "Bearer"}, 

36) 

37 

38def create_access_token(data: dict[str, Any], expires_delta: timedelta) -> str: 

39 to_encode = data.copy() 

40 expire = datetime.now(UTC) + expires_delta 

41 # if expires_delta: 

42 # expire = datetime.now(UTC) + expires_delta 

43 # else: 

44 # expire = datetime.now(UTC) + timedelta(minutes=15) # default 15 min 

45 to_encode.update({"exp": expire}) 

46 return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) 

47 

48# def create_access_token_keycloak(username: str, password: str) -> str: 

49# params = { 

50# 'client_id': APP_CLIENT_ID, 

51# 'client_secret': APP_CLIENT_SECRET, 

52# 'grant_type': 'password', 

53# 'username': username, 

54# 'password': password, 

55# } 

56# import requests, json 

57# from pprint import pprint 

58# import ast 

59# x = requests.post(KEYCLOAK_TOKEN_URL, params, verify=False).content.decode('utf-8') 

60# pprint(json.loads(x)) 

61# token_response = ast.literal_eval(x) 

62# pprint(token_response['id_token']) 

63# return token_response 

64 

65def get_jwt_data( 

66 access_token: str 

67 ) -> dict[str, Any]: 

68 print(access_token) 

69 try: 

70 # 公開鍵 

71 # payload: dict[str, Any] = jwt.decode( 

72 # jwt=access_token, 

73 # key=public_key, 

74 # algorithms=[ALGORITHM], 

75 

76 # # options=options, 

77 # # audience="dev-client", 

78 # # issuer="http://localhost:38080/auth/realms/dev-realm" 

79 # ) 

80 # 共通鍵 

81 payload: dict[str, Any] = jwt.decode( 

82 jwt=access_token, 

83 key=SECRET_KEY, 

84 algorithms=["HS256"], 

85 # options=options, 

86 # audience="dev-client", 

87 # issuer="http://localhost:38080/auth/realms/dev-realm" 

88 ) 

89 print("payload") 

90 print(payload) 

91 except InvalidTokenError as e: 

92 print("error") 

93 print(e) 

94 raise credentials_exception from e 

95 return payload