GoogleによるフルマネージドAirflow環境であるCloud Composerを使う必要があり、
いそぎでAirflow+Cloud Composerをキャッチアップすることになりました。
Googleが公開するベスプラ集があることを知り、読んでみることにしました。
Cloud Composerと題されていいますが、ほぼAirflowと読み替えて良いのかなと思います。
書かれているのは、少し基本的なシナリオだと思います。
経験に裏付けられたゴリゴリの集合知、というものはタダでは手に入らないのだろうと思います。
スタート地点に立つ際の道しるべ、ぐらいの気持ちです。
おそらく一緒に使うシナリオが多いであろう、データ変換ツールのdbtと競合するものがあります。
大構造としてはAirflow DAGの下にdbt DAGが来るため、Airflow DAGのベスプラを実現する前提で
dbt DAGを書いていくものと考えていました。これだけだとバッティングすると思います。
ウェアハウスとは切り離されています。特にBigQueryを前提にするならもう少し踏み込んだ内容と
なるはずだと思いますが、ちょっと書かれていないようです。
いったん半分くらい読んでみたので読書感想文を書いてみました。
【目次】
- ∨ファイル名を標準化します
- ∨DAG は決定的でなければなりません
- ∨DAG はべき等でなければなりません
- ∨タスクはアトミック、かつ、べき等でなければなりません
- ∨可能な限り単純な DAG にします
- ∨Python の docstring 規則に従って、各関数のファイルの上部にドキュメントを記述してください
- ∨default_args にオーナーを追加します。
- ∨dag = DAG() ではなく with DAG() as dag を使用します。
- ∨DAG ID 内にバージョンを設定します。
- ∨DAG にタグを追加します。
- ∨DAG の説明を追加します。
- ∨作成時には DAG を一時停止します。
- ∨catchup=Falseに設定して自動キャッチアップによるCloud Composer環境の過負荷を避けます。
- ∨DAGが完了せずにCloud Composer環境のリソースが保持されることや、再試行時に競合が引き起こされることのないよう、 dagrun_timeout を設定します
- ∨インスタンス化で DAG に引数を渡し、すべてのタスクにデフォルトで同じ start_date が設定されるようにします
- ∨DAG では静的な start_date を使用します。
- ∨retries を、DAG レベルで適用される default_args として設定します。
- ∨適切な再試行回数は 1~4 回です。再試行回数が多すぎると、Cloud Composer 環境に不要な負荷がかかります。
- ∨ここまでのまとめ
はじめに
ファイル名を標準化します
Workflowの特徴を個別に表現するだけでは不十分で、ファイル名が含む部分文字列がサブ機能や属性のインデックスとなっていて欲しい。さらに、ファイル名から機能概要を逆引き推測できたら便利。
作成した DAG ファイルのコレクションを他のデベロッパーが容易に参照できるようにするためです。例: team_project_workflow_version.py
DAG は決定的でなければなりません
入力となるデータが同じであれば、出力は常に同じであるべき、という観点だと思う。
例えば、入力となるデータは同じであっても、実行時間に依存して処理を行ってしまうと、
出力が時間依存となる。テストが無茶苦茶大変になるだろうなと思う。
Airflow DAG単体であれば、そう理解に難しくないポイントだとは思う。
しかし、dbt DAGを含めると一気に縛りがキツくなると思う。
特定の入力によって常に同じ出力が生成される必要があります
DAG はべき等でなければなりません
大雑把に書けば、ある操作を1回行っても数回行っても結果が同じであることを言う。
これを実現する仕組みの選択は結構悩ましく、「追加する範囲をいったん削除追加する」が簡単。
しかし、この方法だと無駄なスキャン量が発生する。
dbtを使用する場合、incremental modelがべき等性担保の手段として挙げられることが多いが、
実際にべき等性を担保するには考慮しないといけないことがある。
こちらの記事(dbtで「Incremental」を使わずに冪等性を担保する方法について)が詳しい。
DAG を何度トリガーしても、毎回同じ効果 / 結果が得られなければなりません
例えば、以下のように書けば、入力テーブルが変わらない限りべき等となる。
これを行うには、入力テーブルに「ロード日時」といったメタデータが必要となる。
1 2 3 4 5 6 7 |
{{ config( materialized="incremental" ) }} ・・・ {%- if is_incremental() %} WHERE ORDERDATE = TO_DATE('{{ var('load_date') }}') {%- endif %} |
タスクはアトミック、かつ、べき等でなければなりません
ちょっと何言っているかわからない書きっぷり。データベースのACID特性のAtomic性を意識する。
ある操作が一連の処理の完了によって達成される場合、部分的に処理の一部が成功する、という
状態になってしまってはいけない。全ての処理が成功したか、全ての処理が失敗したか、のどちらか、
になっていないといけない。
タスクごとに、他のオペレーションとは独立して再実行できる 1つのオペレーションを処理するようにします。タスクがアトミックな場合、そのタスクの一部が成功したことは、タスク全体が成功したことを意味します。
可能な限り単純な DAG にします
ちょっと一般的すぎて良くわからない。
「スケジューリングのコスト」って、「実行のコスト」よりもだいぶ小さいんじゃなかろうか、と思うが、
それでも意識しないといけないのだろうか。
ネストされたツリー構造は、そもそも理解しづらくて避けるべきだろう、とは思う。
タスク間の依存関係が少ない単純な DAG にすると、オーバーヘッドが少なくなるため、スケジューリングのパフォーマンスが向上する傾向があります。一般的に、多数の依存関係がある深くネストされたツリー構造よりも線形構造(例: A->B->C)にしたほうが、効率的な DAG になります。
Python の docstring 規則に従って、各関数のファイルの上部にドキュメントを記述してください
AirflowはPythonで書けることが最大の特徴なので、そのメリットを発揮するため、docstringでコメント書けよと。
Python の docstring 規則は、他のデベロッパーやプラットフォーム エンジニアが Airflow DAG を理解するために役立ちます。
関数と同様に BashOperator のドキュメントも作成するようにしてください。DAG で bash スクリプトを参照している場合、スクリプトの目的を記したドキュメントがないと、このスクリプトに詳しくないデベロッパーにはトラブルシューティングが困難です。
DAG の作成を標準化する
default_args にオーナーを追加します。
タスクを得る(Operatorをインスタンス化する)際に、各Operatorのコンストラクタに引数を与えるが、
複数のOperatorに渡す引数を共通化したい場合には、default_argsをDAG()に与える。
こうすると、Operatorにdefault_argsで設定した引数を与えたことになる。
各Operatorに引数を与えると、defautl_argsをオーバーライドする動作となる。
過去の公式でdefault_argsは、task_id と owner が mandatory(必須) であるとされている。
これについて、Why is ‘owner’ a mandatory argument for tasks and dags? という記事がある。
それに続くPRは More detail on mandatory task arguments であり、mandatoryの根拠を聞いている。
歴史的な理由による、で片付いているな。ベスプラではownerに実装者のメアドなどを書けという。
実装担当者を明らかにせよ、という話であればcomitterを見れば良いだけで、ちょっと意味不明。
mandatoryなので、何か入れないといけないなら、とりあえず実装者のメアドを入れておけ、ということか。
1 2 3 4 5 6 7 8 9 10 11 |
import pendulum with DAG( dag_id='my_dag', start_date=pendulum.datetime(2016, 1, 1, tz="UTC"), schedule_interval='@daily', catchup=False, default_args={'owner': 'hoge@ikuty.com'}, ) as dag: op = BashOperator(task_id='dummy', bash_command='Hello World!') print(op.retries) # 2 |
dag = DAG() ではなく with DAG() as dag を使用します。
Pythonのwith文の仕様。コンテキストマネージャという言う。
try… except… finally をラップするため、リソースの確保と対応する解放が必ず行われる。
with DAG(…):文の下のインデントの中では、各Operatorのコンストラクタにdagインスタンスを
渡さなくてよくなる。
すべてのオペレーターまたはタスクグループに DAG オブジェクトを渡す必要がなくなるようにします。
DAG ID 内にバージョンを設定します。
以下とのこと。あぁ..バージョニングが実装されていないので手動でバージョニングを行うべし、と。
AirflowはDAGをファイルIDで管理しているため、ファイルIDを変更するとUI上、別のDAGとして扱われるよう。
積極的にDAG IDを変更して、Airflow UIに無駄な情報を出さないようにする、というアイデア。
- DAG 内のコードを変更するたびにバージョンを更新します。
- こうすると、削除されたタスクログが UI に表示されなくなることや、ステータスのないタスクが古い DAG 実行に対して生成されること、DAG 変更時の一般的な混乱を防ぐことができます。
- Airflow オープンソースには、将来的にバージョニングが実装される予定です。
DAG にタグを追加します。
公式はこちら。Add tags to DAGs and use it for filtering in the UI
単にUI上の整理のために留まらず、処理の記述に積極的に使うことができる様子。
これを無秩序に使うと扱いづらくなりそうなので、使う場合は用途を明確にしてから使うべきかと思う。
1. デベロッパーがタグをフィルタして Airflow UI をナビゲートできるようにします。
2. 組織、チーム、プロジェクト、アプリケーションなどを基準に DAG をグループ化します。
DAG の説明を追加します。
唐突で非常に当たり前なのだが、あえて宣言することが大事なんだろうと思う。
他のデベロッパーが自分の DAG の内容を理解できるようにします。
作成時には DAG を一時停止します。
なるほど。
こうすると、誤って DAG が実行されて Cloud Composer 環境の負荷が増すという事態を回避できます。
catchup=Falseに設定して自動キャッチアップによるCloud Composer環境の過負荷を避けます。
まず、catchupの前に、Airflowの実行タイミングが直感的でなさそう。
こちらの記事がとても参考になった。【Airflow】DAG実行タイミングを改めて纏めてみた
DAGの実行タイミングはstart_dateとschedule_intervalを利用して計算される。
重要なポイントはschedule_intervalの終了時にDAGが実行される、という点。
また、schedule_intervalはウインドウ枠を表している。
例えば 0 11 * * * であれば、毎日11:00-翌日11:00という時間の幅を表す。
start_date=7月15日、schedule_interval=0 11 * * * のとき、
7月15日 11:00から 7月16日11:00までの期間が終わった後、DAGが開始される。
DAGをデプロイする際、デプロイ日時よりも古いstart_dateを設定することができる。
このとき、start_dateからデプロイ日時までの間で、本来完了しているはずだが実行していない
schedule_intervalについてDAGを実行する機能がcatchup。
catchup=Trueとすると、これらのschedule_intervalが全て再実行の対象となる。
一方、catchup=Falseとsるうと、これらのうち、最後だけが再実行の対象となる。
(Falseとしても、最後の1回は再実行される)
過去のデータを自動投入するとか、危ないので、確認しながら手動実行すべきだと思う。
もし本当にcatchupするのであれば、計画的にFalseからTrueにすべきだろうし、
その時は負荷を許容できる状況としないといけない。
DAGが完了せずにCloud Composer環境のリソースが保持されることや、再試行時に競合が引き起こされることのないよう、 dagrun_timeout を設定します
DAG、タスク、それぞれにタイムアウトプロパティが存在する。それぞれ理解する必要がある。
DAGタイムアウトはdagrun_timeout、タスクタイムアウトはexecution_timeout。
以下が検証コード。job1のexecution_timeout引数をコメントアウトしている。
コメントアウトした状態では、dagrun_timeoutがDAGのタイムアウト時間となる。
検証コードにおいては、タイムアウト時間が15秒のところ、タスクで20秒かかるのでタイムアウトが起きる。
execution_timeout引数のコメントアウトを外すと、DAGのタイムアウト時間が
タスクのタイムアウト時間で上書きされ30秒となる。
タスクで20秒かかってもタイムアウトとならない。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
from datetime import timedelta from airflow.utils.dates import days_ago from airflow import DAG from airflow.operators.python import PythonOperator def wait(**context): time.sleep(20) defalut_args = { "start_date": days_ago(2), "provide_context": True } with DAG( default_args=defalut_args, dagrun_timeout=timedelta(seconds=15), ) as dag: job1 = PythonOperator( task_id='wait_task1', python_callable=wait, # execution_timeout=timedelta(second=30) ) |
ベスプラの言うところは、ちゃんとタイムアウトを設定しろよ、ということだと思う。
インスタンス化で DAG に引数を渡し、すべてのタスクにデフォルトで同じ start_date が設定されるようにします
Airflowでは、Operatorのコンストラクタにstart_dateを与えられるようになっている。
同一DAGに所属するタスクが異なるstart_dateを持つ、という管理が大変なDAGを作ることも出来てしまう。
基本的には、DAGにstart_dateを渡して、タスクのデフォルトを揃えるべき、だそう。
DAG では静的な start_date を使用します。
これがベスプラになっているのはかなり助かる。
動的な start_date を使用した場合、誤った開始日が導き出され、失敗したタスク インスタンスやスキップされた DAG 実行を消去するときにエラーが発生する可能性があります。
retries を、DAG レベルで適用される default_args として設定します。
retriesについても、start_dateと同様にDAGレベルで default_args として設定するそう。
なお、タスクのリトライに関する設定には以下のようなものがある。
- retries (int)
- retry_delay (datetime.timedelta)
- retry_exponential_backoff (bool)
- max_retry_delay (datetime.timedelta)
- on_retry_callback (callable)
retries (int)は、タスクが”失敗”となる前に実行されるリトライ回数。
retry_delay (datetime.timedelta)はリトライ時の遅延時間。
retry_exponential_backoff (bool)はリトライ遅延での指数関数的後退アルゴリズムによるリトライ間隔の待ち時間を増加させるかどうか
max_retry_delay (datetime.timedelta)はリトライ間の最大遅延間隔
on_retry_callback (callable)はリトライ時のコールバック関数
適切な再試行回数は 1~4 回です。再試行回数が多すぎると、Cloud Composer 環境に不要な負荷がかかります。
具体的に retries を何に設定すべきか、について書かれている。
ここまでのまとめ
ここまでのステートメントがコードになっている。
わかりやすい。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
import airflow from airflow import DAG from airflow.operators.bash_operator import BashOperator # default_args 辞書を定義して、DAG のデフォルト パラメータ(開始日や頻度など)を指定する default_args = { 'owner': 'me', 'retries': 2, # 最大再試行回数は 2~4 回にすること 'retry_delay': timedelta(minutes=5) } # `with` ステートメントを使用して DAG オブジェクトを定義し、一意の DAG ID と default_args 辞書を指定する with DAG( 'dag_id_v1_0_0', # ID にバージョンを含める default_args=default_args, description='This is a detailed description of the DAG', # 詳しい説明 start_date=datetime(2022, 1, 1), # 静的な開始日 dagrun_timeout=timedelta(minutes=10), # この DAG に固有のタイムアウト is_paused_upon_creation= True, catchup= False, tags=['example', 'versioned_dag_id'], # この DAG に固有のタグ schedule_interval=None, ) as dag: # BashOperator を使用してタスクを定義する task = BashOperator( task_id='bash_task', bash_command='echo "Hello World"' ) |