Airflow自体にDAGの実行結果をテスト(End-To-End Pipeline Tests)する仕組みは無いようで、
以下のような地道な仕組みを自力で作る必要がありそうです。
- テストデータを用意する
- Airflowが提供するAirflow APIを使用してDAGを実行する
- DAGの終了を待つ
- 結果をAssertする
他にAirflow CLIも使えそうですが、pythonコードの一部にするならAPIの方が使い勝手が良さそうです。
API仕様書を上から読んでみたので、その感想を書いてみます。
他にもあるのですが、今回の用途に使いそうなものを抜粋しています。
“読んでみた”だけなので、誤りがあるかもしれません。概要を理解するぐらいの気持ちで読んでください。
Airflow API概要
今日時点のAirflow APIのAPI仕様書は以下です。
RESTful APIとなっていて、Resourceに対するCRUDをHTTP Methodで表現します。
1つ、update_maskという考え方があります。リソースの値を更新する際、リソースjsonと同時に
クエリパラメタで”変更したい値は何か”を渡すことで、リソースjsonの該当値のみを更新できます。
1 2 3 |
resource = request.get('/resource/my-id').json() resource['my_field'] = 'new-value' request.patch('/resource/my-id?update_mask=my_field', data=json.dumps(resource)) |
API Authenticationがusername/passwordで雑ですが、
DAGのis_pausedをtrueにするには、以下の通りpatchを叩くようです。
1 2 3 4 5 6 |
curl -X PATCH 'https://example.com/api/v1/dags/{dag_id}?update_mask=is_paused' \ -H 'Content-Type: application/json' \ --user "username:password" \ -d '{ "is_paused": true }' |
CORSを有効にする必要があります。Enabling CORS
様々なAPI認証が用意されています。API認証はAirflowのauth managerで管理されます。Authentication
エラーはRFC7807準拠です。つまり、Unauthenticated、PermissionDenied、BadRequest、NotFound、MethodNotAllowed、NotAcceptable、AlreadyExistsが扱われます。Errors
Connections
ざっとAPIを眺めていきます。
まずはConnection。順当なCRUDです。patchでupdate_maskが使われます。
コードから一通りConnectionを触れそうです。
Testって何か調べてみました。
デフォルトでdisabledになっていますが、Airflow UI(Connections)から”Test”ボタンを押下できます。
Connectionと関連付けられたhookのtest_connection()メソッドを実行するようです。
これと同等の機能が動くようです。
Method | Endpoint | Overview | Response |
---|---|---|---|
GET | /connections | List Connection | array of objects(ConnectionCollectionItem). |
POST | /connections | Create a Connection | created connection. |
GET | /connections/{connection_id} | Get a connection | connection |
PATCH | /connections/{connection_id} | Update a connection | updated connection |
DELETE | /connections/{connection_id} | Delete a connection | (Status) |
POST | /connections/test | Test a connection | (Status) |
DAG
次はDAG。まずDAG一覧に対する操作。一覧に対してpatchを叩ける様子です。
Method | Endpoint | Overview |
---|---|---|
GET | /dags | List DAGs in the database. dag_id_pattern can be set to match dags of a specific pattern |
PATCH | /dags | Update DAGs of a given dag_id_pattern using UpdateMask. This endpoint allows specifying ~ as the dag_id_pattern to update all DAGs. New in version 2.3.0 |
次は個別のDAGに対する操作。
Method | Endpoint | Overview |
---|---|---|
GET | /dags/{dag_id} | Get basic information about a DAG.Presents only information available in database (DAGModel). If you need detailed information, consider using GET /dags/{dag_id}/details. |
PATCH | /dags/{dag_id} | Update a DAG. |
DELETE | /dags/{dag_id} | Deletes all metadata related to the DAG, including finished DAG Runs and Tasks. Logs are not deleted. This action cannot be undone.New in version 2.2.0 |
GET | /dags/{dag_id}/tasks/detail | Get simplified representation of a task. |
GET | /dags/{dag_id}/detail | Get a simplified representation of DAG.The response contains many DAG attributes, so the response can be large. If possible, consider using GET /dags/{dag_id}. |
Airflowにおいて、Operatorのインスタンスに”Task”という用語が割り当てられています。
つまり、「Operatorに定義した処理を実際に実行すること」が”Task”としてモデリングされています。
「”Task”をA月B日X時Y分Z秒に実行すること」が、”TaskInstance”としてモデリングされています。
あるDAGは、実行日/実行時間ごとの複数の”TaskInstance”を保持しています。
以下のAPIにおいて、DAGが保持する”Task”,”日付レンジ”等を指定して実行します。
“TaskInstance”を”Clear(再実行)”します。また、”TaskInstance”の状態を一気に更新します。
Method | Endpoint | Overview |
---|---|---|
POST | /dags/{dag_id}/clearTaskInstances | Clears a set of task instances associated with the DAG for a specified date range. |
POST | /dags/{dag_id}/updateTaskInstancesState | Updates the state for multiple task instances simultaneously. |
GET | /dags/{dag_id}/tasks | Get tasks for DAG. |
なんだこれ、ソースコードを取得できるらしいです。
Method | Endpoint | Overview |
---|---|---|
GET | /dagSources/{file_token} | Get a source code using file token. |
DAGRun
“Task”と”TaskInstance”の関係と同様に”DAG”と”DAGRun”が関係しています。
「A月B日X時Y分Z秒のDAG実行」が”DAGRun”です。DAGRun。順当な感じです。
新規にトリガしたり、既存のDAGRunを取得して更新したり削除したり、再実行したりできます。
Method | Endpoint | Overview |
---|---|---|
GET | /dags/{dag_id}/dagRuns | List DAG runs.This endpoint allows specifying ~ as the dag_id to retrieve DAG runs for all DAGs. |
POST | /dags/{dag_id}/dagRuns | Trigger a new DAG run.This will initiate a dagrun. If DAG is paused then dagrun state will remain queued, and the task won’t run. |
POST | /dags/~/dagRuns/list | List DAG runs (batch).This endpoint is a POST to allow filtering across a large number of DAG IDs, where as a GET it would run in to maximum HTTP request URL length limit. |
GET | /dags/{dag_id}/dagRuns/{dag_run_id} | Get a DAG run. |
DELETE | /dags/{dag_id}/dagRuns/{dag_run_id} | Delete a DAG run. |
PATCH | /dags/{dag_id}/dagRuns/{dag_run_id} | Modify a DAG run.New in version 2.2.0 |
POST | /dags/{dag_id}/dagRuns/{dag_run_id}/clear | Clear a DAG run.New in version 2.4.0 |
以下はスキップ..
Method | Endpoint | Overview |
---|---|---|
GET | /dags/{dag_id}/dagRuns/{dag_run_id}/upstreamDatasetEvents | Get datasets for a dag run.New in version 2.4.0 |
PATCH | /dags/{dag_id}/dagRuns/{dag_run_id}/setNote | Update the manual user note of a DagRun.New in version 2.5.0 |
DAGWarning
DAGのimport_errors一覧を返します。
Method | Endpoint | Overview |
---|---|---|
GET | /dagWarnings | List Dag Waranings. |
DAGStats
A DAG Run status is determined when the execution of the DAG is finished. The execution of the DAG depends on its containing tasks and their dependencies. The status is assigned to the DAG Run when all of the tasks are in the one of the terminal states (i.e. if there is no possible transition to another state) like success, failed or skipped. The DAG Run is having the status assigned based on the so-called “leaf nodes” or simply “leaves”. Leaf nodes are the tasks with no children.
There are two possible terminal states for the DAG Run:
- success if all of the leaf nodes states are either success or skipped,
- failed if any of the leaf nodes state is either failed or upstream_failed.
Method | Endpoint | Overview |
---|---|---|
GET | /dagStats | List Dag statistics. |
ImportError
Airflow Best PractiveのTesting a DagにDAGのテスト観点に関する記述が(サラッと)書かれています。
まず、DAGは普通のpythonコードなので、pythonインタプリタで実行する際にエラーが起きないことを確認すべし、とのことです。
以下の実行により、未解決の依存関係、文法エラーをチェックします。もちろん、どこで実行するかが重要なので、DAG実行環境と合わせる必要があります。
Airflow APIにより、このレベルのエラーがDAGファイルにあるか確認できるようです。
1 |
python your-dag-file.py |
Method | Endpoint | Overview |
---|---|---|
GET | /importErrors | List import errors. |
GET | /importErrors/{import_error_id} | Get an import error. |
Variables
DAGに記述したくないCredentials等を管理する仕組みで、Airflow UIからポチポチ操作すると作れます。
Variableはkey-valueそのままです。DAGからkeyを指定することで参照できます。
Airflow APIからもVariableをCRUDできます。
Method | Endpoint | Overview |
---|---|---|
GET | /variables | List variables.The collection does not contain data. To get data, you must get a single entity. |
POST | /variables | Create a variable. |
GET | /variables/{variable_key} | Get a variable by key. |
PATCH | /variables/{variable_key} | Update a variable by key. |
DELETE | /variables/{variable_key} | Delete a variable by key. |
まとめ
RESTfulAPIが用意されているということは、内部のオブジェクトをCRUD出来るということなのだろう、
という推測のもと、Airflow APIのAPI仕様書を読んで感想を書いてみました。
Airflowの概念と対応するリソースはAPIに出現していて、End-To-End Pipeline Testを書く際に、Assert、実行制御を記述できそうな気持ちになりました。
Assert、実行制御、だけなら、こんなに要らない気もします。
API呼び出し自体の煩雑さがあり、Testの記述量が増えてしまうかもしれません。
以下の記事のようにwrapperを書く必要があるかもしれません。
https://github.com/chandulal/airflow-testing/blob/master/src/integrationtest/python/airflow_api.py
DAGの入力側/出力側Endに対するファイル入出力は別で解決が必要そうです。
「API仕様書を読んでみた」の次の記事が書けるときになったら、再度まとめ記事を書いてみようと思います。