pytoolkit.pipelines module¶
テーブルデータのパイプライン。
ここでいうパイプラインとは、pl.DataFrameを受け取り、pl.DataFrameを返すステップの連なり。 各ステップは0個以上の依存するステップを持ち、依存するステップの出力を結合したものを受け取って 当該ステップの処理を行う。
- ステップ:
各ステップはpytoolkit.pipelines.Stepクラスやその派生クラスを継承して実装する。 基本は1ファイル1ステップで実装する。 ステップ名の既定値は実装したクラスのモジュールのファイル名の拡張子を除く部分となる。
- ステップの実行の種類:
ステップの実行には、主にコンペでの利用を想定し、"train"と"test"の二種類がある。
- ステップの種類:
以下のパターンで実装を補助する派生クラスがある。
"train"と"test"で同じ処理をするステップ: TransformStep
"train"で事前に学習や統計情報の取得などの処理を行い、 その後は"train"と"test"で同じ処理をするステップ: FitTransformStep
コンペなどで"train"と"test"のデータを全部まとめて処理するステップ: AllDataStep
"train"で学習してモデルを保存してout-of-fold predictionsを出力し、 "test"でモデルを読み込んで推論をするステップ: ModelStep
"train"のみ実装されているステップ: TrainOnlyStep
ステップを実装する際は、Stepクラスか上記の派生クラスの中から1つを選んで継承する。
- メモリキャッシュ:
明示的に無効化していない場合、各ステップの結果はメモリにキャッシュされる。 依存関係などで同じステップが何度も呼び出される場合、2回目以降は初回の結果を返す。
- ファイルキャッシュ:
明示的に無効化していない場合、各ステップの結果はファイルにもキャッシュされる。 ステップや依存先のステップが定義されたファイルの更新日時が新しくなっていれば自動的に再計算する。 (ソースコード上の依存関係などまでは追えないため注意が必要。)
リファクタリングなどで意図せず再計算が発生する場合があるため、 非常に計算に時間がかかるステップを実装する場合などには、 更新日時のチェックを個別または全体で無効化できる。 無効化した場合、必要に応じて手動で削除する。
- fine:
HPOなどの時間がかかる処理はStepの中でself.fineがTrueの場合のみ行うよう実装し、 コンペの提出時などだけfine=Trueで実行する。 必要に応じてモデルやキャッシュが別管理となる。
- class pytoolkit.pipelines.Step[ソース]¶
ベースクラス:
object
パイプラインの各ステップ。
- has_fine_test[ソース]¶
fine=Trueな場合にtestで特別な処理を実装しているのか否か (既定値: False) has_fine_trainがFalseでhas_fine_testがTrueな場合はTTAなどを想定したもの。 モデルは共通だが推論結果のキャッシュは別となる。
サンプル
派生クラスでuse_file_cacheなどを変更したい場合は__init__をオーバーライドする。
def __init__(self) -> None: super().__init__() self.depends_on = [] self.use_file_cache = True self.use_memory_cache = True self.check_file_cache_time = True self.has_fine_train = False self.has_fine_test = False
- has_fine(run_type)[ソース]¶
fine=Trueな場合に特別な処理を実装しているのか否か
- パラメータ:
run_type (Literal['train', 'test']) --
- 戻り値の型:
bool
- abstract run(df, run_type)[ソース]¶
当該ステップの処理。Pipeline経由で呼び出される。
- パラメータ:
df (DataFrame) --
run_type (Literal['train', 'test']) --
- 戻り値の型:
DataFrame
- invoke_single(step_types, invoke_type=None, cache='use')[ソース]¶
指定ステップの実行。(結果が1列なことがわかっているとき用の糖衣構文)
- パラメータ:
step_types (str | list[str]) -- 実行するステップのクラス
run_type -- 実行するステップの種類
cache (Literal['use', 'ignore', 'disable']) -- ignoreにするとキャッシュがあっても読み込まない、disableにすると保存もしない。(伝播はしない)
invoke_type (Literal['train', 'test'] | ~typing.Literal['all'] | None) --
- 戻り値:
実行結果
- 戻り値の型:
Series
- invoke(step_types, invoke_type=None, cache='use')[ソース]¶
指定ステップの実行。
- パラメータ:
step_types (str | list[str]) -- 実行するステップのクラス
run_type -- 実行するステップの種類
cache (Literal['use', 'ignore', 'disable']) -- ignoreにするとキャッシュがあっても読み込まない、disableにすると保存もしない。(伝播はしない)
invoke_type (Literal['train', 'test'] | ~typing.Literal['all'] | None) --
- 戻り値:
実行結果
- 戻り値の型:
DataFrame
- class pytoolkit.pipelines.Pipeline(models_dir, cache_dir, fine=False, check_file_cache_time=True)[ソース]¶
ベースクラス:
object
パイプラインを管理するクラス。
- パラメータ:
models_dir (str | PathLike[str]) -- モデルやログの保存先ディレクトリ
cache_dir (str | PathLike[str]) -- キャッシュの保存先ディレクトリ
fine (bool) -- 高精度な学習・推論を行うのか否か
check_file_cache_time (bool) -- ファイルキャッシュの簡易更新日時チェックをするのか否か (既定値: True)
- invoke(steps, invoke_type, cache='use')[ソース]¶
ステップの実行。
- パラメータ:
steps (str | list[str] | Step | list[pytoolkit.pipelines.Step]) -- 実行するステップの名前 or インスタンス。複数指定可。
invoke_type (Literal['train', 'test'] | ~typing.Literal['all'] | None) -- 実行するステップの種類。省略時は現在実行中のステップ。
cache (Literal['use', 'ignore', 'disable']) -- ignoreにするとキャッシュがあっても読み込まない、disableにすると保存もしない。(伝播はしない)
- 戻り値:
処理結果
- 戻り値の型:
DataFrame
- class pytoolkit.pipelines.TransformStep[ソース]¶
ベースクラス:
Step
データ変換ステップ。
サンプル
class Step(pytoolkit.pipelines.TransformStep): def __init__(self): super().__init__() self.depends_on = ["xxx"] def transform(self, df: pl.DataFrame) -> pl.DataFrame: return df
- class pytoolkit.pipelines.FitTransformStep[ソース]¶
ベースクラス:
Step
特徴量作成ステップ。
サンプル
class Step(pytoolkit.pipelines.FitTransformStep): def __init__(self): super().__init__() self.depends_on = ["xxx"] def fit(self, df_train: pl.DataFrame) -> typing.Any: transformer = {} return transformer def transform(self, transformer, df: pl.DataFrame) -> pl.DataFrame: return df
- class pytoolkit.pipelines.AllDataStep[ソース]¶
ベースクラス:
Step
trainで全データを使って処理するステップ。
self.is_test_column_name の bool列を追加して結合したものがdf_allとして渡される。
サンプル
class Step(pytoolkit.pipelines.AllDataStep): def __init__(self): super().__init__() self.depends_on = ["xxx"] def fit_transform(self, df_all: pl.DataFrame) -> pl.DataFrame: return df_all
- class pytoolkit.pipelines.ModelStep[ソース]¶
ベースクラス:
Step
機械学習モデルなど、train/testで別々の処理をするステップ。
サンプル
class Step(pytoolkit.pipelines.ModelStep): def __init__(self): super().__init__() self.depends_on = ["xxx"] def train(self, df_train: pl.DataFrame) -> pl.DataFrame: return df_train def test(self, df_test: pl.DataFrame) -> pl.DataFrame: return df_test
- class pytoolkit.pipelines.TrainOnlyStep[ソース]¶
ベースクラス:
Step
グループIDやラベル周りの処理など、"train"のみ実装されているステップ。
サンプル
class Step(pytoolkit.pipelines.TrainOnlyStep): def __init__(self): super().__init__() self.depends_on = ["xxx"] def train(self, df_train: pl.DataFrame) -> pl.DataFrame: return df_train