Airflowで条件付きタスクを作成する方法
Airflowで下記のスキーマのような条件付きタスクを作りたいと思います。想定されるシナリオは以下の通りです。
- タスク1が実行される
- タスク1が成功したら、タスク2aを実行する。
- タスク1が失敗した場合、タスク2bを実行する。
- 最後にタスク3が実行される
上記のタスクは全てSSHExecuteOperatorです。 ShortCircuitOperatorやXComを使って条件を管理すればよいのでしょうが、どのように実装すればよいのかがわかりません。解決方法を教えてください。
43
2
AirflowにはBranchPythonOperatorがあり、これを使うことでより直接的に分岐の依存関係を表現することができます。
docs](https://airflow.incubator.apache.org/concepts.html?highlight=branch#branching)にその使い方が書かれています。
BranchPythonOperator は PythonOperator と似ていますが、task_id を返す python_callable を期待する点が異なります。返されたtask_idが追従し、他のすべてのパスはスキップされます。Python関数が返すtask_idは、BranchPythonOperatorタスクの直接下流のタスクを参照する必要があります。
コード例
EDIT:
Airflowのバージョン >=1.10.3 をインストールしている場合、タスクIDのリストを返す ことで、1つのOperatorで複数の下流パスをスキップしたり、結合前のダミータスクを使用しない ことも可能です。
気流トリガールール]1を使用する必要があります。
すべてのオペレータは、生成されたタスクがトリガーされるルールを定義するtrigger_rule引数を持っています。
トリガールールの可能性。
ここで、あなたの問題を解決するためのアイデアを紹介します。