Coverage for app/core/base/base_usecase.py: 100%
7 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-15 01:44 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-15 01:44 +0000
1from abc import ABCMeta, abstractmethod
2from typing import TYPE_CHECKING, Generic, TypeVar
4if TYPE_CHECKING:
5 from .base_input_dto import BaseInputDTO
6 from .base_output_dto import BaseOutputDTO
8# TypeVarsを定義
9BaseInputDTOType = TypeVar("BaseInputDTOType", bound="BaseInputDTO", default="BaseInputDTO")
10BaseOutputDTOType = TypeVar("BaseOutputDTOType", bound="BaseOutputDTO", default="BaseOutputDTO")
11# BaseStreamOutputDTOType = TypeVar("BaseStreamOutputDTOType", bound="BaseStreamOutputDTO", default="BaseStreamOutputDTO")
13# UseCase
14class BaseUsecase(Generic[BaseInputDTOType, BaseOutputDTOType], metaclass=ABCMeta):
15 """
16 ユースケースのベースポリモーフィズム.
18 具象クラスではexecuteを実装すること.
19 """
21 @abstractmethod
22 def execute(self, input_dto: BaseInputDTOType) -> BaseOutputDTOType:
23 """BaseUsecaseを実行する抽象メソッド."""
24 raise NotImplementedError
26# # StreamUseCase
27# class BaseStreamUsecase(Generic[BaseInputDTOType, BaseStreamOutputDTOType], metaclass=ABCMeta):
28# """
29# ストリームユースケースのベースポリモーフィズム.
31# AsyncGeneratorで型が変わるので分ける
32# 具象クラスではexecuteを実装すること.
33# """
35# @abstractmethod
36# async def execute(self, input_dto: BaseInputDTOType) -> AsyncGenerator[BaseStreamOutputDTOType]:
37# """Usecaseを実行する抽象メソッド."""
38# raise NotImplementedError
40# # mypy対策のおまじない
41# # async def メソッドがデフォルトで Coroutine 型として推論されることの対策
42# # https://mypy.readthedocs.io/en/stable/more_types.html#asynchronous-iterators
43# if False:
44# yield 0
47# async def sse_stream(self, input_dto: BaseInputDTOType) -> AsyncGenerator[str]:
48# """
49# UsecaseのexecuteをSSE形式に変換するメソッド.
51# TODO(nonomura): エラーハンドリングを検討する
52# """
53# # 遅延起動する場合
54# # import asyncio
55# # await asyncio.sleep(0.01)
56# async for dto in self.execute(input_dto):
57# yield dto.to_sse() # DTOをSSE形式の文字列に変換して返す