Cara membuat bersyarat tugas dalam aliran Udara

Saya ingin membuat sebuah bersyarat tugas di aliran Udara seperti dijelaskan dalam skema di bawah ini. Skenario yang diharapkan adalah sebagai berikut:

  • Tugas 1 mengeksekusi
  • Jika Tugas 1 berhasil, kemudian jalankan Tugas 2a
  • Lain Jika Tugas 1 gagal, maka melaksanakan Tugas 2b
  • Akhirnya mengeksekusi Tugas 3

Semua tugas di atas adalah SSHExecuteOperator. I'm menebak saya harus menggunakan ShortCircuitOperator dan / atau XCom untuk mengelola kondisi ini, tapi saya tidak jelas tentang bagaimana untuk melaksanakan itu. Bisa anda jelaskan solusinya?

Aliran udara yang memiliki BranchPythonOperator yang dapat digunakan untuk menyatakan percabangan ketergantungan yang lebih langsung.

The docs menjelaskan penggunaannya:

BranchPythonOperator jauh seperti PythonOperator kecuali bahwa ia mengharapkan python_callable yang kembali task_id. Yang task_id kembali diikuti, dan semua jalan-jalan lain yang dilewati. Yang task_id yang dikembalikan oleh fungsi Python telah menjadi referensi tugas ini diatas hilir dari BranchPythonOperator tugas.

...

Jika anda ingin melewati beberapa tugas, perlu diingat bahwa anda tidak dapat memiliki kosong di jalan, jika demikian membuat dummy tugas.

Contoh Kode

def dummy_test():
    return 'branch_a'

A_task = DummyOperator(task_id='branch_a', dag=dag)
B_task = DummyOperator(task_id='branch_false', dag=dag)

branch_task = BranchPythonOperator(
    task_id='branching',
    python_callable=dummy_test,
    dag=dag,
)

branch_task >> A_task 
branch_task >> B_task

EDIT:

Jika anda're memasang aliran Udara versi >=1.10.3, anda juga dapat kembali daftar tugas id, yang memungkinkan anda untuk melewatkan beberapa hilir jalan di satu Operator dan don't menggunakan dummy tugas sebelum bergabung.

Komentar (2)
Larutan

Anda harus menggunakan aliran udara yang memicu aturan

Semua operator memiliki trigger_rule argumen yang mendefinisikan aturan yang dihasilkan tugas terpicu.

Pemicu aturan kemungkinan:

ALL_SUCCESS = 'all_success'
ALL_FAILED = 'all_failed'
ALL_DONE = 'all_done'
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
DUMMY = 'dummy'

Berikut ini adalah ide untuk memecahkan masalah anda:

from airflow.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.hooks import SSHHook

sshHook = SSHHook(conn_id=)

task_1 = SSHExecuteOperator(
        task_id='task_1',
        bash_command=,
        ssh_hook=sshHook,
        dag=dag)

task_2 = SSHExecuteOperator(
        task_id='conditional_task',
        bash_command=,
        ssh_hook=sshHook,
        dag=dag)

task_2a = SSHExecuteOperator(
        task_id='task_2a',
        bash_command=,
        trigger_rule=TriggerRule.ALL_SUCCESS,
        ssh_hook=sshHook,
        dag=dag)

task_2b = SSHExecuteOperator(
        task_id='task_2b',
        bash_command=,
        trigger_rule=TriggerRule.ALL_FAILED,
        ssh_hook=sshHook,
        dag=dag)

task_3 = SSHExecuteOperator(
        task_id='task_3',
        bash_command=,
        trigger_rule=TriggerRule.ONE_SUCCESS,
        ssh_hook=sshHook,
        dag=dag)

task_2.set_upstream(task_1)
task_2a.set_upstream(task_2)
task_2b.set_upstream(task_2)
task_3.set_upstream(task_2a)
task_3.set_upstream(task_2b)
Komentar (5)