記事・メモ一覧

default eye-catch image.

AirflowでEnd-To-End Pipeline Testsを行うためにAirflow APIを調べてみた話

Airflow自体にDAGの実行結果をテスト(End-To-End Pipeline Tests)する仕組みは無いようで、 以下のような地道な仕組みを自力で作る必要がありそうです。 テストデータを用意する Airflowが提供するAirflow APIを使用してDAGを実行する DAGの終了を待つ 結果をAssertする 他にAirflow CLIも使えそうですが、pythonコードの一部にするならAPIの方が使い勝手が良さそうです。 API仕様書を上から読んでみたので、その感想を書いてみます。 他にもあるのですが、今回の用途に使いそうなものを抜粋しています。 \"読んでみた\"だけなので、誤りがあるかもしれません。概要を理解するぐらいの気持ちで読んでください。 [arst_toc tag=\"h4\"] Airflow API概要 今日時点のAirflow APIのAPI仕様書は以下です。 Airflow API (Stable) (2.10.0) RESTful APIとなっていて、Resourceに対するCRUDをHTTP Methodで表現します。 1つ、update_maskという考え方があります。リソースの値を更新する際、リソースjsonと同時に クエリパラメタで\"変更したい値は何か\"を渡すことで、リソースjsonの該当値のみを更新できます。 resource = request.get(\'/resource/my-id\').json() resource[\'my_field\'] = \'new-value\' request.patch(\'/resource/my-id?update_mask=my_field\', data=json.dumps(resource)) API Authenticationがusername/passwordで雑ですが、 DAGのis_pausedをtrueにするには、以下の通りpatchを叩くようです。 curl -X PATCH \'https://example.com/api/v1/dags/{dag_id}?update_mask=is_paused\' -H \'Content-Type: application/json\' --user \"username:password\" -d \'{ \"is_paused\": true }\' CORSを有効にする必要があります。Enabling CORS 様々なAPI認証が用意されています。API認証はAirflowのauth managerで管理されます。Authentication エラーはRFC7807準拠です。つまり、Unauthenticated、PermissionDenied、BadRequest、NotFound、MethodNotAllowed、NotAcceptable、AlreadyExistsが扱われます。Errors Connections ざっとAPIを眺めていきます。 まずはConnection。順当なCRUDです。patchでupdate_maskが使われます。 コードから一通りConnectionを触れそうです。 Testって何か調べてみました。 デフォルトでdisabledになっていますが、Airflow UI(Connections)から\"Test\"ボタンを押下できます。 Connectionと関連付けられたhookのtest_connection()メソッドを実行するようです。 これと同等の機能が動くようです。 Method Endpoint Overview Response GET /connections List Connection array of objects(ConnectionCollectionItem). POST /connections Create a Connection created connection. GET /connections/{connection_id} Get a connection connection PATCH /connections/{connection_id} Update a connection updated connection DELETE /connections/{connection_id} Delete a connection (Status) POST /connections/test Test a connection (Status) DAG 次はDAG。まずDAG一覧に対する操作。一覧に対してpatchを叩ける様子です。 Method Endpoint Overview GET /dags List DAGs in the database. dag_id_pattern can be set to match dags of a specific pattern PATCH /dags Update DAGs of a given dag_id_pattern using UpdateMask. This endpoint allows specifying ~ as the dag_id_pattern to update all DAGs. New in version 2.3.0 次は個別のDAGに対する操作。 Method Endpoint Overview GET /dags/{dag_id} Get basic information about a DAG.Presents only information available in database (DAGModel). If you need detailed information, consider using GET /dags/{dag_id}/details. PATCH /dags/{dag_id} Update a DAG. DELETE /dags/{dag_id} Deletes all metadata related to the DAG, including finished DAG Runs and Tasks. Logs are not deleted. This action cannot be undone.New in version 2.2.0 GET /dags/{dag_id}/tasks/detail Get simplified representation of a task. GET /dags/{dag_id}/detail Get a simplified representation of DAG.The response contains many DAG attributes, so the response can be large. If possible, consider using GET /dags/{dag_id}. Airflowにおいて、Operatorのインスタンスに\"Task\"という用語が割り当てられています。 つまり、「Operatorに定義した処理を実際に実行すること」が\"Task\"としてモデリングされています。 「\"Task\"をA月B日X時Y分Z秒に実行すること」が、\"TaskInstance\"としてモデリングされています。 あるDAGは、実行日/実行時間ごとの複数の\"TaskInstance\"を保持しています。 以下のAPIにおいて、DAGが保持する\"Task\",\"日付レンジ\"等を指定して実行します。 \"TaskInstance\"を\"Clear(再実行)\"します。また、\"TaskInstance\"の状態を一気に更新します。 Method Endpoint Overview POST /dags/{dag_id}/clearTaskInstances Clears a set of task instances associated with the DAG for a specified date range. POST /dags/{dag_id}/updateTaskInstancesState Updates the state for multiple task instances simultaneously. GET /dags/{dag_id}/tasks Get tasks for DAG. なんだこれ、ソースコードを取得できるらしいです。 Method Endpoint Overview GET /dagSources/{file_token} Get a source code using file token. DAGRun \"Task\"と\"TaskInstance\"の関係と同様に\"DAG\"と\"DAGRun\"が関係しています。 「A月B日X時Y分Z秒のDAG実行」が\"DAGRun\"です。DAGRun。順当な感じです。 新規にトリガしたり、既存のDAGRunを取得して更新したり削除したり、再実行したりできます。 Method Endpoint Overview GET /dags/{dag_id}/dagRuns List DAG runs.This endpoint allows specifying ~ as the dag_id to retrieve DAG runs for all DAGs. POST /dags/{dag_id}/dagRuns Trigger a new DAG run.This will initiate a dagrun. If DAG is paused then dagrun state will remain queued, and the task won\'t run. POST /dags/~/dagRuns/list List DAG runs (batch).This endpoint is a POST to allow filtering across a large number of DAG IDs, where as a GET it would run in to maximum HTTP request URL length limit. GET /dags/{dag_id}/dagRuns/{dag_run_id} Get a DAG run. DELETE /dags/{dag_id}/dagRuns/{dag_run_id} Delete a DAG run. PATCH /dags/{dag_id}/dagRuns/{dag_run_id} Modify a DAG run.New in version 2.2.0 POST /dags/{dag_id}/dagRuns/{dag_run_id}/clear Clear a DAG run.New in version 2.4.0 以下はスキップ.. Method Endpoint Overview GET /dags/{dag_id}/dagRuns/{dag_run_id}/upstreamDatasetEvents Get datasets for a dag run.New in version 2.4.0 PATCH /dags/{dag_id}/dagRuns/{dag_run_id}/setNote Update the manual user note of a DagRun.New in version 2.5.0 DAGWarning DAGのimport_errors一覧を返します。 Method Endpoint Overview GET /dagWarnings List Dag Waranings. DAGStats A DAG Run status is determined when the execution of the DAG is finished. The execution of the DAG depends on its containing tasks and their dependencies. The status is assigned to the DAG Run when all of the tasks are in the one of the terminal states (i.e. if there is no possible transition to another state) like success, failed or skipped. The DAG Run is having the status assigned based on the so-called “leaf nodes” or simply “leaves”. Leaf nodes are the tasks with no children. There are two possible terminal states for the DAG Run: success if all of the leaf nodes states are either success or skipped, failed if any of the leaf nodes state is either failed or upstream_failed. Method Endpoint Overview GET /dagStats List Dag statistics. ImportError Airflow Best PractiveのTesting a DagにDAGのテスト観点に関する記述が(サラッと)書かれています。 まず、DAGは普通のpythonコードなので、pythonインタプリタで実行する際にエラーが起きないことを確認すべし、とのことです。 以下の実行により、未解決の依存関係、文法エラーをチェックします。もちろん、どこで実行するかが重要なので、DAG実行環境と合わせる必要があります。 Airflow APIにより、このレベルのエラーがDAGファイルにあるか確認できるようです。 $ python your-dag-file.py Method Endpoint Overview GET /importErrors List import errors. GET /importErrors/{import_error_id} Get an import error. Variables DAGに記述したくないCredentials等を管理する仕組みで、Airflow UIからポチポチ操作すると作れます。 Variableはkey-valueそのままです。DAGからkeyを指定することで参照できます。 Airflow APIからもVariableをCRUDできます。 Method Endpoint Overview GET /variables List variables.The collection does not contain data. To get data, you must get a single entity. POST /variables Create a variable. GET /variables/{variable_key} Get a variable by key. PATCH /variables/{variable_key} Update a variable by key. DELETE /variables/{variable_key} Delete a variable by key. まとめ RESTfulAPIが用意されているということは、内部のオブジェクトをCRUD出来るということなのだろう、 という推測のもと、Airflow APIのAPI仕様書を読んで感想を書いてみました。 Airflowの概念と対応するリソースはAPIに出現していて、End-To-End Pipeline Testを書く際に、Assert、実行制御を記述できそうな気持ちになりました。 Assert、実行制御、だけなら、こんなに要らない気もします。 API呼び出し自体の煩雑さがあり、Testの記述量が増えてしまうかもしれません。 以下の記事のようにwrapperを書く必要があるかもしれません。 https://github.com/chandulal/airflow-testing/blob/master/src/integrationtest/python/airflow_api.py DAGの入力側/出力側Endに対するファイル入出力は別で解決が必要そうです。 「API仕様書を読んでみた」の次の記事が書けるときになったら、再度まとめ記事を書いてみようと思います。

default eye-catch image.

CustomOperatorのUnitTestを理解するためGCSToBigQueryOperatorのUnitTestを読んでみた話

未知の連携先との入出力を行う際、CustomOperatorを作るという解決策があります。 CustomOperatorを自作した場合、そのテストをどう書くか、という問題が発生します。 ビルトインのGCSToBigQueryOperatorがどうテストされているかを読むと、雰囲気がわかりました。 UnitTestコードを読んで見ましたので、本記事で感想を書いてみます。 https://github.com/apache/airflow/blob/main/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py 前提となる知識 Airflowのhookについて理解する必要がありました。 フワッとしていますが、コードを読んで使われ方をながめているとイメージが湧いてきます。 hook しばしば外部からデータを入力したり外部へデータを出力する必要が出てくる。 外部と接続する際にcredentialsを保管し使用する必要があるが、 Airflowはconnectionという概念のオブジェクトを用意している。 connection は conn_id により識別される。Airflow UIやCLIから管理できる。 connectionを直接操作するようなlow-levelコードを書くこともできるが、 煩雑にならないよう、外部リソース毎にhookというhigh-levelインターフェースが用意されている。 Connections & Hooks pythonのunittestも理解する必要がありました。 unittestのmockについて以下が参考になりました。 [clink implicit=\"false\" url=\"https://qiita.com/satamame/items/1c56e7ff3fc7b2986003\" imgurl=\"https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcQqktBhX0kv-C4zk1lu0D8T0ExDUFQdNdu9dQ&s\" title=\"Python の unittest の mock\" excerpt=\"Python の unittest を使っていて、mock が何をするものかは分かっているけど、まだちょっと得体が知れない、怖い、という段階があると思います。この段階を克服するために、何も知らない状態から徐々に mock を理解するためのステップを作りたいと思いました。対象は Python 3.x です。\"] UnitTestを読んでいく TestGCSToBigQueryOperatorというクラスにUnitTestメソッドの実装例が書かれています。 python built-inのテストパッケージであるunittestが使用されています。 @mock.patchデコレータを使用しBigQueryHookをpatchしています。 BigQueryHookのmockインスタンスがhookとして渡ります。 hookのreturn_value, side_effectを差し替えてGCSToBigQueryOperatorインスタンスを実行します。 insert_job(),generate_job_id(),split_table_name(),get_job()の差し替えを行なっています。 メソッドの階層をドット(.)で繋いでより深い場所を差し替えられる様子です。 unittestを書いた人はコードが何に依存しているか分かるので、知識に基づいて依存しているものをmockします。 import json from unittest import mock from unittest.mock import MagicMock, call TASK_ID = \"test-gcs-to-bq-operator\" TEST_EXPLICIT_DEST = \"test-project.dataset.table\" WRITE_DISPOSITION = \"WRITE_TRUNCATE\" SCHEMA_FIELDS = [ {\"name\": \"id\", \"type\": \"STRING\", \"mode\": \"NULLABLE\"}, {\"name\": \"name\", \"type\": \"STRING\", \"mode\": \"NULLABLE\"}, ] MAX_ID_KEY = \"id\" JOB_PROJECT_ID = \"job-project-id\" TEST_BUCKET = \"test-bucket\" TEST_SOURCE_OBJECTS = \"test/objects/test.csv\" DATASET = \"dataset\" TABLE = \"table\" GCS_TO_BQ_PATH = \"airflow.providers.google.cloud.transfers.gcs_to_bigquery.{}\" job_id = \"123456\" hash_ = \"hash\" REAL_JOB_ID = f\"{job_id}_{hash_}\" class TestGCSToBigQueryOperator: @mock.patch(GCS_TO_BQ_PATH.format(\"BigQueryHook\")) def test_max_value_external_table_should_execute_successfully(self, hook): hook.return_value.insert_job.side_effect = [ MagicMock(job_id=REAL_JOB_ID, error_result=False), REAL_JOB_ID, ] hook.return_value.generate_job_id.return_value = REAL_JOB_ID hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) hook.return_value.get_job.return_value.result.return_value = (\"1\",) operator = GCSToBigQueryOperator( task_id=TASK_ID, bucket=TEST_BUCKET, source_objects=TEST_SOURCE_OBJECTS, destination_project_dataset_table=TEST_EXPLICIT_DEST, write_disposition=WRITE_DISPOSITION, schema_fields=SCHEMA_FIELDS, max_id_key=MAX_ID_KEY, external_table=True, project_id=JOB_PROJECT_ID, ) \"基づく知識\"は第三者には理解不能ですが、GCSToBigQueryOperator.pyを読むと理由がわかります。 GCSToBigQueryOperatorのexecute(self, context:Context)を読むと、 先頭でBigQueryHookのインスタンスを取得し、BaseOperator由来のself.hookに設定しているようです。 generate_job_id()により、job_idを取得しています。 _use_existing_table()内で、split_table_name()により,ProjectID,Dataset,Tableを取得しています。 mockしたjob_idが既に存在している場合、get_job()で既存を取得しています。 def execute(self, context: Context): hook = BigQueryHook( gcp_conn_id=self.gcp_conn_id, location=self.location, impersonation_chain=self.impersonation_chain, ) self.hook = hook self.source_format = self.source_format.upper() job_id = self.hook.generate_job_id( job_id=self.job_id, dag_id=self.dag_id, task_id=self.task_id, logical_date=context[\"logical_date\"], configuration=self.configuration, force_rerun=self.force_rerun, ) さて、Assertは以下のように書かれています。 GCSToBigQueryOperatorは、Source(GCS)から.csv等を読み込みDest(BigQuery)へ配置するものです。 Destの然るべき場所にテーブルが作られ、値が入ります。 execute()すると、max_id_keyで指定したカラムの最大値が戻るようです。 \"test-bucket\"に配置した\"test/objects/test.csv\"は\"id\",\"name\"の2列からなるCSVで、 例えば\"id\"=\"1\", \"name\"=\"hoge\"ならば、\"id\"列の最大値である1が戻るため、1をassertすればOKです。 result = operator.execute(context=MagicMock()) assert result == \"1\" これだと、分岐をだいぶすっ飛ばしているので、だいぶ薄いカバレッジになるかと思います。 まとめ GCSToBigQueryOperatorのUnitTestを読んでみました。分かってしまうと普通のUnitTestでした。 Source to Destのパターンはだいたい似たようになるのかも、なので、 作るUnitTestも似たような感じになるのかもしれません。

default eye-catch image.

GoogleによるAirflow DAG実装のベスプラ集を読んでみた – その1

GoogleによるフルマネージドAirflow環境であるCloud Composerを使う必要があり、 いそぎでAirflow+Cloud Composerをキャッチアップすることになりました。 Googleが公開するベスプラ集があることを知り、読んでみることにしました。 Cloud Composerと題されていいますが、ほぼAirflowと読み替えて良いのかなと思います。 書かれているのは、少し基本的なシナリオだと思います。 経験に裏付けられたゴリゴリの集合知、というものはタダでは手に入らないのだろうと思います。 スタート地点に立つ際の道しるべ、ぐらいの気持ちです。 おそらく一緒に使うシナリオが多いであろう、データ変換ツールのdbtと競合するものがあります。 大構造としてはAirflow DAGの下にdbt DAGが来るため、Airflow DAGのベスプラを実現する前提で dbt DAGを書いていくものと考えていました。これだけだとバッティングすると思います。 ウェアハウスとは切り離されています。特にBigQueryを前提にするならもう少し踏み込んだ内容と なるはずだと思いますが、ちょっと書かれていないようです。 いったん半分くらい読んでみたので読書感想文を書いてみました。 [clink implicit=\"false\" url=\"https://cloud.google.com/blog/ja/products/data-analytics/optimize-cloud-composer-via-better-airflow-dags\" imgurl=\"https://www.gstatic.com/pantheon/images/welcome/supercloud.svg\" title=\"Airflow DAG の改良による Cloud Composer の最適化\" excerpt=\"このガイドには、Apache Airflow DAG を作成する際に一般的に適用できる、作業項目のチェックリストが掲載されています。これらのチェック項目は、Google Cloud とオープンソース コミュニティが判断したベスト プラクティスに沿ったものとなっています。一連の高パフォーマンスの DAG によって Cloud Composer の動作が最適化され、標準化された作成方法によって、デベロッパーが数百個、あるいは数千個の DAG でも管理できるようになります。チェック項目のそれぞれが、Cloud Composer 環境と開発プロセスにメリットをもたらします。\"] [arst_toc tag=\"h4\"] はじめに ファイル名を標準化します Workflowの特徴を個別に表現するだけでは不十分で、ファイル名が含む部分文字列がサブ機能や属性のインデックスとなっていて欲しい。さらに、ファイル名から機能概要を逆引き推測できたら便利。 作成した DAG ファイルのコレクションを他のデベロッパーが容易に参照できるようにするためです。例: team_project_workflow_version.py DAG は決定的でなければなりません 入力となるデータが同じであれば、出力は常に同じであるべき、という観点だと思う。 例えば、入力となるデータは同じであっても、実行時間に依存して処理を行ってしまうと、 出力が時間依存となる。テストが無茶苦茶大変になるだろうなと思う。 Airflow DAG単体であれば、そう理解に難しくないポイントだとは思う。 しかし、dbt DAGを含めると一気に縛りがキツくなると思う。 特定の入力によって常に同じ出力が生成される必要があります DAG はべき等でなければなりません 大雑把に書けば、ある操作を1回行っても数回行っても結果が同じであることを言う。 これを実現する仕組みの選択は結構悩ましく、「追加する範囲をいったん削除追加する」が簡単。 しかし、この方法だと無駄なスキャン量が発生する。 dbtを使用する場合、incremental modelがべき等性担保の手段として挙げられることが多いが、 実際にべき等性を担保するには考慮しないといけないことがある。 こちらの記事(dbtで「Incremental」を使わずに冪等性を担保する方法について)が詳しい。 DAG を何度トリガーしても、毎回同じ効果 / 結果が得られなければなりません 例えば、以下のように書けば、入力テーブルが変わらない限りべき等となる。 これを行うには、入力テーブルに「ロード日時」といったメタデータが必要となる。 {{ config( materialized=\"incremental\" ) }} ・・・ {%- if is_incremental() %} WHERE ORDERDATE = TO_DATE(\'{{ var(\'load_date\') }}\') {%- endif %} タスクはアトミック、かつ、べき等でなければなりません ちょっと何言っているかわからない書きっぷり。データベースのACID特性のAtomic性を意識する。 ある操作が一連の処理の完了によって達成される場合、部分的に処理の一部が成功する、という 状態になってしまってはいけない。全ての処理が成功したか、全ての処理が失敗したか、のどちらか、 になっていないといけない。 タスクごとに、他のオペレーションとは独立して再実行できる 1つのオペレーションを処理するようにします。タスクがアトミックな場合、そのタスクの一部が成功したことは、タスク全体が成功したことを意味します。 可能な限り単純な DAG にします ちょっと一般的すぎて良くわからない。 「スケジューリングのコスト」って、「実行のコスト」よりもだいぶ小さいんじゃなかろうか、と思うが、 それでも意識しないといけないのだろうか。 ネストされたツリー構造は、そもそも理解しづらくて避けるべきだろう、とは思う。 タスク間の依存関係が少ない単純な DAG にすると、オーバーヘッドが少なくなるため、スケジューリングのパフォーマンスが向上する傾向があります。一般的に、多数の依存関係がある深くネストされたツリー構造よりも線形構造(例: A->B->C)にしたほうが、効率的な DAG になります。 Python の docstring 規則に従って、各関数のファイルの上部にドキュメントを記述してください AirflowはPythonで書けることが最大の特徴なので、そのメリットを発揮するため、docstringでコメント書けよと。 Python の docstring 規則は、他のデベロッパーやプラットフォーム エンジニアが Airflow DAG を理解するために役立ちます。 関数と同様に BashOperator のドキュメントも作成するようにしてください。DAG で bash スクリプトを参照している場合、スクリプトの目的を記したドキュメントがないと、このスクリプトに詳しくないデベロッパーにはトラブルシューティングが困難です。 DAG の作成を標準化する default_args にオーナーを追加します。 タスクを得る(Operatorをインスタンス化する)際に、各Operatorのコンストラクタに引数を与えるが、 複数のOperatorに渡す引数を共通化したい場合には、default_argsをDAG()に与える。 こうすると、Operatorにdefault_argsで設定した引数を与えたことになる。 各Operatorに引数を与えると、defautl_argsをオーバーライドする動作となる。 過去の公式でdefault_argsは、task_id と owner が mandatory(必須) であるとされている。 これについて、Why is \'owner\' a mandatory argument for tasks and dags? という記事がある。 それに続くPRは More detail on mandatory task arguments であり、mandatoryの根拠を聞いている。 歴史的な理由による、で片付いているな。ベスプラではownerに実装者のメアドなどを書けという。 実装担当者を明らかにせよ、という話であればcomitterを見れば良いだけで、ちょっと意味不明。 mandatoryなので、何か入れないといけないなら、とりあえず実装者のメアドを入れておけ、ということか。 import pendulum with DAG( dag_id=\'my_dag\', start_date=pendulum.datetime(2016, 1, 1, tz=\"UTC\"), schedule_interval=\'@daily\', catchup=False, default_args={\'owner\': \'hoge@ikuty.com\'}, ) as dag: op = BashOperator(task_id=\'dummy\', bash_command=\'Hello World!\') print(op.retries) # 2 dag = DAG() ではなく with DAG() as dag を使用します。 Pythonのwith文の仕様。コンテキストマネージャという言う。 try... except... finally をラップするため、リソースの確保と対応する解放が必ず行われる。 with DAG(...):文の下のインデントの中では、各Operatorのコンストラクタにdagインスタンスを 渡さなくてよくなる。 すべてのオペレーターまたはタスクグループに DAG オブジェクトを渡す必要がなくなるようにします。 DAG ID 内にバージョンを設定します。 以下とのこと。あぁ..バージョニングが実装されていないので手動でバージョニングを行うべし、と。 AirflowはDAGをファイルIDで管理しているため、ファイルIDを変更するとUI上、別のDAGとして扱われるよう。 積極的にDAG IDを変更して、Airflow UIに無駄な情報を出さないようにする、というアイデア。 DAG 内のコードを変更するたびにバージョンを更新します。 こうすると、削除されたタスクログが UI に表示されなくなることや、ステータスのないタスクが古い DAG 実行に対して生成されること、DAG 変更時の一般的な混乱を防ぐことができます。 Airflow オープンソースには、将来的にバージョニングが実装される予定です。 DAG にタグを追加します。 公式はこちら。Add tags to DAGs and use it for filtering in the UI 単にUI上の整理のために留まらず、処理の記述に積極的に使うことができる様子。 これを無秩序に使うと扱いづらくなりそうなので、使う場合は用途を明確にしてから使うべきかと思う。 1. デベロッパーがタグをフィルタして Airflow UI をナビゲートできるようにします。 2. 組織、チーム、プロジェクト、アプリケーションなどを基準に DAG をグループ化します。 DAG の説明を追加します。 唐突で非常に当たり前なのだが、あえて宣言することが大事なんだろうと思う。 他のデベロッパーが自分の DAG の内容を理解できるようにします。 作成時には DAG を一時停止します。 なるほど。 こうすると、誤って DAG が実行されて Cloud Composer 環境の負荷が増すという事態を回避できます。 catchup=Falseに設定して自動キャッチアップによるCloud Composer環境の過負荷を避けます。 まず、catchupの前に、Airflowの実行タイミングが直感的でなさそう。 こちらの記事がとても参考になった。【Airflow】DAG実行タイミングを改めて纏めてみた DAGの実行タイミングはstart_dateとschedule_intervalを利用して計算される。 重要なポイントはschedule_intervalの終了時にDAGが実行される、という点。 また、schedule_intervalはウインドウ枠を表している。 例えば 0 11 * * * であれば、毎日11:00-翌日11:00という時間の幅を表す。 start_date=7月15日、schedule_interval=0 11 * * * のとき、 7月15日 11:00から 7月16日11:00までの期間が終わった後、DAGが開始される。 DAGをデプロイする際、デプロイ日時よりも古いstart_dateを設定することができる。 このとき、start_dateからデプロイ日時までの間で、本来完了しているはずだが実行していない schedule_intervalについてDAGを実行する機能がcatchup。 catchup=Trueとすると、これらのschedule_intervalが全て再実行の対象となる。 一方、catchup=Falseとsるうと、これらのうち、最後だけが再実行の対象となる。 (Falseとしても、最後の1回は再実行される) 過去のデータを自動投入するとか、危ないので、確認しながら手動実行すべきだと思う。 もし本当にcatchupするのであれば、計画的にFalseからTrueにすべきだろうし、 その時は負荷を許容できる状況としないといけない。 DAGが完了せずにCloud Composer環境のリソースが保持されることや、再試行時に競合が引き起こされることのないよう、 dagrun_timeout を設定します DAG、タスク、それぞれにタイムアウトプロパティが存在する。それぞれ理解する必要がある。 DAGタイムアウトはdagrun_timeout、タスクタイムアウトはexecution_timeout。 以下が検証コード。job1のexecution_timeout引数をコメントアウトしている。 コメントアウトした状態では、dagrun_timeoutがDAGのタイムアウト時間となる。 検証コードにおいては、タイムアウト時間が15秒のところ、タスクで20秒かかるのでタイムアウトが起きる。 execution_timeout引数のコメントアウトを外すと、DAGのタイムアウト時間が タスクのタイムアウト時間で上書きされ30秒となる。 タスクで20秒かかってもタイムアウトとならない。 from datetime import timedelta from airflow.utils.dates import days_ago from airflow import DAG from airflow.operators.python import PythonOperator def wait(**context): time.sleep(20) defalut_args = { \"start_date\": days_ago(2), \"provide_context\": True } with DAG( default_args=defalut_args, dagrun_timeout=timedelta(seconds=15), ) as dag: job1 = PythonOperator( task_id=\'wait_task1\', python_callable=wait, # execution_timeout=timedelta(second=30) ) ベスプラの言うところは、ちゃんとタイムアウトを設定しろよ、ということだと思う。 インスタンス化で DAG に引数を渡し、すべてのタスクにデフォルトで同じ start_date が設定されるようにします Airflowでは、Operatorのコンストラクタにstart_dateを与えられるようになっている。 同一DAGに所属するタスクが異なるstart_dateを持つ、という管理が大変なDAGを作ることも出来てしまう。 基本的には、DAGにstart_dateを渡して、タスクのデフォルトを揃えるべき、だそう。 DAG では静的な start_date を使用します。 これがベスプラになっているのはかなり助かる。 動的な start_date を使用した場合、誤った開始日が導き出され、失敗したタスク インスタンスやスキップされた DAG 実行を消去するときにエラーが発生する可能性があります。 retries を、DAG レベルで適用される default_args として設定します。 retriesについても、start_dateと同様にDAGレベルで default_args として設定するそう。 なお、タスクのリトライに関する設定には以下のようなものがある。 retries (int) retry_delay (datetime.timedelta) retry_exponential_backoff (bool) max_retry_delay (datetime.timedelta) on_retry_callback (callable) retries (int)は、タスクが\"失敗\"となる前に実行されるリトライ回数。 retry_delay (datetime.timedelta)はリトライ時の遅延時間。 retry_exponential_backoff (bool)はリトライ遅延での指数関数的後退アルゴリズムによるリトライ間隔の待ち時間を増加させるかどうか max_retry_delay (datetime.timedelta)はリトライ間の最大遅延間隔 on_retry_callback (callable)はリトライ時のコールバック関数 適切な再試行回数は 1~4 回です。再試行回数が多すぎると、Cloud Composer 環境に不要な負荷がかかります。 具体的に retries を何に設定すべきか、について書かれている。 ここまでのまとめ ここまでのステートメントがコードになっている。 わかりやすい。 import airflow from airflow import DAG from airflow.operators.bash_operator import BashOperator # default_args 辞書を定義して、DAG のデフォルト パラメータ(開始日や頻度など)を指定する default_args = { \'owner\': \'me\', \'retries\': 2, # 最大再試行回数は 2~4 回にすること \'retry_delay\': timedelta(minutes=5) } # `with` ステートメントを使用して DAG オブジェクトを定義し、一意の DAG ID と default_args 辞書を指定する with DAG( \'dag_id_v1_0_0\', # ID にバージョンを含める default_args=default_args, description=\'This is a detailed description of the DAG\', # 詳しい説明 start_date=datetime(2022, 1, 1), # 静的な開始日 dagrun_timeout=timedelta(minutes=10), # この DAG に固有のタイムアウト is_paused_upon_creation= True, catchup= False, tags=[\'example\', \'versioned_dag_id\'], # この DAG に固有のタグ schedule_interval=None, ) as dag: # BashOperator を使用してタスクを定義する task = BashOperator( task_id=\'bash_task\', bash_command=\'echo \"Hello World\"\' )

default eye-catch image.

Snowpark Container Services上でWebアプリ(FastAPI/React/TypeScript)を動かしてみた

シンプルな Multi-Container App を動かしている以下の記事にインスパイアされてみました。 以下の記事では、Docker networkを前提にフロントがサーバの名前解決を行っています。 これをデプロイすると、ブラウザで動くフロントコードがサーバの名前を解決できません(SPCS無関係)。 リバースプロキシを挟んでプライベートなダウンストリームにAPIを配置する方法が良さそうです。 今回の記事はSPCSの動作確認をすることが目的なので凝ったことはせず、 ViteをそのままデプロイしてProxyで解決してみたのでご紹介します。 [clink implicit=\"false\" url=\"https://medium.com/@maseedilyas9848/snowflake-container-mastery-step-by-step-deployment-of-your-multi-container-app-with-snowpark-211682514851\" imgurl=\"https://miro.medium.com/v2/resize:fit:1400/format:webp/1*t7s-Rl6F4BBV-yYs-ovODQ.png\" title=\"Snowflake Container Mastery: Step-by-Step Deployment of Your Multi-Container App with Snowpark Container Services\" excerpt=\"The buzz around town is all about Snowflake’s latest product feature, “Snowpark Container Services” and the excitement is real. Now, with the feature hitting public preview in various AWS regions, this blog dives into the nitty-gritty of what container services bring to the table. Join me as we explore what makes this feature tick and unravel the steps to deploy a multi-container app within Snowflake. Let’s break it down!\"] [arst_toc tag=\"h4\"] SPCSのアーキテクチャ Image Registryはイメージリポジトリです。AWSのECR、AzureのACRと似た感じで操作できます。 Container Serviceはデプロイメントの単位で、1個以上のコンテナをデプロイできます。 Compute Poolは計算資源で、複数のContainer Serviceが共有します。 Serviceは基本的にはPrivateなリソースとなりますが、SpecでEndpointsを定義することで、 Publicリソースとすることができます。自動的に80にmapされます。 Serviceには以下のフォーマットでDNS名が付きます。 Service同士は以下のDNS使ってプライベート通信できます。 同一DB、Schema内に作成したServiceは先頭のServiceNameが異なるだけとなりますが、 その場合に限り、ServiceNameだけで互いの名前解決ができます。 Service間連携については、公式で\"Service-to-Service\"というパターンで紹介されています。 <Service Name>-<Schema Name>-<DB Name>.snowflakecomputing.internal 1つのService内に複数のコンテナを配置することができます。 同一Service内で各コンテナ内の通信したいと考えたのですが、方法を見つけることができませんでした。 (出来ないはずはなさそうで方法はあるのかもしれません..) 今回作るもの フロント側、サーバ側の2つのコンテナを、それぞれ別々のServiceとしてデプロイします。 フロント側をPublic、サーバ側はPrivateとします。概要は以下の通りです。 フロント側 Vite React/TypeScript ReactからのAPIリクエストは一旦ViteのProxyで受けて、ProxyからAPIに流す サーバ側にGETリクエストして応答を表示するだけ サーバ側 uvicorn FastAPI poetry GETリクエストを受けて\"Hello World\"をJSONで返すだけ 思いっきり開発用な感じですが、SPCSの動作確認が目的ですのでこれでいきます。 ローカルで動作確認をして、SPCSにデプロイします。ファイル構成は以下の通りです。 $ tree . -I node_modules -I __pycache__ . ├── api │   ├── Dockerfile │   ├── api.py │   ├── app.py │   ├── main.py │   └── pyproject.toml ├── compose.yml └── front ├── Dockerfile ├── entry.sh ├── index.html ├── package-lock.json ├── package.json ├── src │   └── hello.tsx ├── tsconfig.json ├── tsconfig.node.json └── vite.config.ts compose.yml サーバ側、フロント側の2つのコンテナが、サーバ側->フロント側の順に起動するように書きます。 それぞれ、8080、5173 で待つようにします。 services: api: build: api volumes: - ./api:/app ports: - 8080:8080 front: build: front volumes: - ./front:/app ports: - 5173:5173 depends_on: - api サーバ側 Snowflakeの公式のチュートリアルだとFlaskが使われています。 今回は、最近使い始めたFastAPIを使ってAPIサーバを立ててみようと思います。 FlaskとFastAPIの比較はこちらが詳しいです。 FastAPIの特徴はPydanticによるデータ検証とAsyncI/O。TypeScriptのように型チェックできます。 パッケージマネージャにはpoetryを使います。デファクトが無いPythonのパッケージ管理界隈で npmやcomposer的な使い勝手が提供されます。 FastAPIの公式では、Python用Webサーバのuvicornを使ってホストされています。 uvicornでFastAPIを動かすコンテナを1個立てていきます。 Dockerfile 最新のPythonのイメージにpoetryをインストールします。 公式がインストーラを配布していて公式の手順通りに叩けばインストールできます。 公式のガイドの通り、POETRY_HOME=/etc/poetry とします。 Installation with the official installer FROM python:3.12 # 公式の通り /etc/poetry にインストールする ENV POETRY_HOME=/etc/poetry RUN curl -sSL https://install.python-poetry.org | python - ENV PATH $POETRY_HOME/bin:$PATH WORKDIR /app COPY . . RUN poetry install CMD [\"python\",\"main.py\"] pyproject.toml バニラのPythonとpyproject.tomlだけで依存関係を考慮したパッケージ管理が出来ますが、 要はnpmやbundle,composer的な使い勝手に寄せたパッケージ管理に対する需要があります。 poetry用の依存関係を書いていきます。fastapi、uvicornの現在(2/16)の最新を指定します。 [tool.poetry] name = \"test\" [tool.poetry.dependencies] python = \"^3.12\" fastapi = \"^0.109.2\" uvicorn = \"^0.27.1\" api.py [GET] /hello に対して Hello World 的な JSON を返す FastAPI の Hello Worldです。 後のapp.pyで FastAPIインスタンスに紐付けます。app.pyと分離することでロジックを分離できます。 from fastapi import APIRouter router = APIRouter() @router.get(\"/hello\") async def hoge(): return {\"result\":\"Hello World\"} app.py FastAPIのHello Worldコードです。Dockerfileから開始します。 from api import router as api_router from fastapi import FastAPI app = FastAPI() app.include_router(api_router, prefix=\"/api\") main.py pythonコードからuvicornを起動します。uvicorn.runの公式の仕様はこちら。 第1引数の\"app:app\"は\"app\"モジュールの中の\"app\"オブジェクトという表記です。 app.pyに記述したFastAPI()のインスタンスappを指します。stringで渡す必要があります。 hostは\"0.0.0.0\"を指定します。なぜ\"127.0.0.1\"でないのかはこちらが参考になります。 今回はPort=8080で起動します。reload=Trueとすると、HotReload機能が有効になります。便利。 import uvicorn if __name__ == \"__main__\": uvicorn.run( \"app:app\", host=\"0.0.0.0\", port=8080, reload=True, ) 起動してみる docker compose up して http://localhost:8080/docs を開くと以下が表示されます。 ちゃんとJSONでHello Worldが戻りました。 フロント側 VueとReactの開発用に使われるローカル開発用のサーバ Vite をホストするコンテナを立てます。 Viteは Vue.jsの開発者Evan You氏が開発したJavaScript/TypeScriptで、ヴィートと読み、 フランス語で\"素早い\"という意味だそう。(webpackのように)リソースバンドルが不要で起動が速い。 (Laravelも9.xでwebpackを捨ててViteになってた..) 素の状態でTypeScriptを扱えるため、すぐにTypeScriptを書き始められる特徴があります。 Dockerfile nodeのrelease scheduleはこちら。 2/17のnodeのActiveLTSのMajor Versionは20で、2026-04-30がEnd of lifeとなっています。 これを使いたいので node:20 を指定します。 FROM node:20 WORKDIR /app COPY . . RUN npm install ENTRYPOINT [ \"./entry.sh\" ] entry.sh Dockerfile の ENTRYPOINT で npm run dev するだけのshです。 #!/bin/bash npm run dev package.json npm create vite@latest で Prjディレクトリ内に様々なファイルが作られます。 package.jsonも作られます。 Hello World で必要なもの以外を削ってみました。 npm install後、npm run devで viteを実行します。 TS用のconfigは別です。 他に生成されるpackage-lock.jsonが必要ですが省略します。 { \"name\": \"front\", \"private\": true, \"version\": \"0.0.0\", \"scripts\": { \"dev\": \"vite\" }, \"dependencies\": { \"react\": \"^18.2.0\", \"react-dom\": \"^18.2.0\" }, \"devDependencies\": { \"@vitejs/plugin-react\": \"^4.2.1\" } } tsconfig.json viteのPrj生成で自動生成されるTS用のconfigファイルです。 手をつけずに配置します。 { \"compilerOptions\": { \"target\": \"ES2021\", \"useDefineForClassFields\": true, \"lib\": [\"ES2021\", \"DOM\", \"DOM.Iterable\"], \"module\": \"ESNext\", \"skipLibCheck\": true, /* Bundler mode */ \"moduleResolution\": \"bundler\", \"allowImportingTsExtensions\": true, \"resolveJsonModule\": true, \"isolatedModules\": true, \"noEmit\": true, \"jsx\": \"react-jsx\", /* Linting */ \"strict\": true, \"noUnusedLocals\": true, \"noUnusedParameters\": true, \"noFallthroughCasesInSwitch\": true }, \"include\": [\"src\"], \"references\": [{ \"path\": \"./tsconfig.node.json\" }] } tsconfig.node.json viteのPrj生成で自動生成されるTS用のconfigファイルです。 手をつけずに配置します。 { \"compilerOptions\": { \"composite\": true, \"skipLibCheck\": true, \"module\": \"ESNext\", \"moduleResolution\": \"bundler\", \"allowSyntheticDefaultImports\": true }, \"include\": [\"vite.config.ts\"] } vite.config.ts viteのPrj生成で自動生成されるvite用のconfigファイルです。 設定ファイルが.tsなところが凄いです。普通にimport文を書けます。 上のuvicornの起動で127.0.0.1ではなく0.0.0.0を指定したのと同様に、 viteも127.0.0.1ではなく0.0.0.0で待たせる必要があります。 serverオプションのhostにtrueを設定すると、0.0.0.0となります。公式 FastAPIの同一パスと対応するProxyを設定します。 以下で、server.proxy.api.target は SPCS上のAPIコンテナのPrivateエンドポイント を表します。 DNS名はサービス単位で作られます。本来長いFQDNを指定する必要がありますが、 同一スキーマに作られたサービスに限り、サービス名だけで解決できるようです。 DNS名はアンダースコア(_)がハイフン(-)に置き換わります。6時間くらいハマりました.. 後で ikuty_api_service サービスを作りますが、ikuty-api-serviceを 使います。 詳細は以下を参照してください。 Service-to-service communications import { defineConfig } from \'vite\' import react from \'@vitejs/plugin-react\' export default defineConfig({ plugins: [react()], server: { host: true, proxy: { \"/api\": { target: `http://ikuty-api-service:8080/`, changeOrigin: true } } }, }) index.html Reactのコンポーネントを表示するガワとなるhtmlです。 <!DOCTYPE html> <html> <head> <meta charset=\"UTF-8\" /> <meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\" /> <script type=\"module\" src=\"/src/hello.tsx\" defer></script> </head> <body> <div id=\"root\">Waiting...</div> </body> </html> src/hello.tsx ようやくHello WorldするReactコンポーネントの本体です。 画面にはvalueというStateを表示しています。APIのURLは上記の通りproxyとします。 今回作成した/api/hello APIの応答を受けた後、setValueによりStateを更新します。 import React from \'react\' import { useState } from \'react\' import ReactDOM from \'react-dom/client\' const App = () => { const [value, setValue] = useState(\'\') const url = \'/api/hello\' fetch(url,{}) .then(res=>res.json()) .then(data=>setValue(data[\'result\'])) return ( {value},{} ) } ReactDOM.createRoot(document.getElementById(\'root\')!).render( ) 起動してみる docker compose up すると、ほとんど一瞬でviteが起動します。 http://localhost:5173 を開きます。 Waiting...という表示が一瞬で Hello World に書き変わります。 ロールの作成 SPCSの各リソースの作成に必要な権限はこちらにあります。 ゴリ押ししただけなので間違っている可能性大です.. 行ったり来たりしたので足りないものがあるかもしれません。 use role ACCOUNTADMIN; CREATE ROLE IKUTY_CONTAINER_USER_ROLE; GRANT ROLE IKUTY_CONTAINER_USER_ROLE TO ROLE ACCOUNTADMIN; GRANT USAGE ON DATABASE IKUTY_DB TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT USAGE ON SCHEMA IKUTY_DB.PUBLIC TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT CREATE IMAGE REPOSITORY ON SCHEMA T_IKUTA_DB.PUBLIC TO ROLE IKUTY_CONTAINER_USER_ROLE; -- CREATE SERVICEに必要な権限 -- https://docs.snowflake.com/en/sql-reference/sql/create-service#access-control-requirements GRANT USAGE ON DATABASE IKUTY_DB TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT USAGE ON SCHEMA IKUTY_DB.PUBLIC TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT CREATE COMPUTE POOL ON ACCOUNT TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT CREATE IMAGE REPOSITORY ON SCHEMA IKUTY_DB.PUBLIC TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT CREATE SERVICE ON SCHEMA IKUTY_DB.PUBLIC TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT USAGE ON COMPUTE POOL IKUTY_SCS_POOL TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT IMPORTED PRIVILEGES ON DATABASE snowflake TO ROLE IKUTY_CONTAINER_USER_ROLE; -- GRANT READ ON STAGE IKUTY_SCS_STAGE TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT READ ON IMAGE REPOSITORY IKUTY_SCS_REPOSITORY TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE IKUTY_CONTAINER_USER_ROLE; Image Repositoryの作成 SPCSで使用するイメージを配置するリポジトリを作成します。 AWSのECR、AzureのACR的に、dockerコマンドから透過的にpushできるようです。 公式は以下。 CREATE IMAGE REPOSITORY USE ROLE IKUTY_CONTAINER_USER_ROLE; CREATE OR REPLACE IMAGE REPOSITORY IKUTY_SCS_REPOSITORY; SHOW IMAGE REPOSITORIES; SHOW IMAGEを叩くと repository_url が返ってます。 Image Repositoryにプッシュする 作成したImage Repositoryにローカルで作成したイメージをpushしていきます。 pushは指定されたタグを送信するという仕様のため、docker tagコマンドでイメージにタグを付けます。 docker tagの仕様はこちら。 ローカルで以下を行います。(サニタイズのため分かりづらいですが補完してください..) # タグをつける # docker tag $ docker tag app_front:latest /app_front:scs $ docker tag app_api:latest /app_api:scs # Snowflake Image Repositoryにログインする $ docker login -u Login Succeeded # イメージをpushする $ docker push /app_front:scs ... $ docker push /app_api:scs ... Compute Poolを作成する Compute Poolを作成します。 CREATE COMPUTE POOL CREATE COMPUTE POOL ikuty_scs_pool MIN_NODES = 1 MAX_NODES = 1 INSTANCE_FAMILY = CPU_X64_XS AUTO_RESUME = TRUE INITIALLY_SUSPENDED = FALSE AUTO_SUSPEND_SECS = 3600 ; 以下でCREATEしたCompute poolをDESCRIBEできます。 DESCRIBE COMPUTE POOL 自分の環境だと、CREATE COMPUTE POOLしてから15分ほどステータスがSTARTINGでした。 15分ぐらいして叩くとステータスがACTIVEに変わりました。(結構かかるイメージ) 以下、公式の実行例です。 DESCRIBE ikuty_scs_pool +-----------------------+--------+-----------+-----------+-----------------+--------------+----------+-------------------+-------------+--------------+------------+-------------------------------+-------------------------------+-------------------------------+--------------+---------+ | name | state | min_nodes | max_nodes | instance_family | num_services | num_jobs | auto_suspend_secs | auto_resume | active_nodes | idle_nodes | created_on | resumed_on | updated_on | owner | comment | |-----------------------+--------+-----------+-----------+-----------------+--------------+----------+-------------------+-------------+--------------+------------+-------------------------------+-------------------------------+-------------------------------+--------------+---------| | IKUTY_SCS_POOL | ACTIVE | 1 | 1 | CPU_X64_XS | 1 | 0 | 0 | false | 1 | 0 | 2023-05-01 11:42:20.323 -0700 | 2023-05-01 11:42:20.326 -0700 | 2023-08-27 17:35:52.761 -0700 | ACCOUNTADMIN | NULL | +-----------------------+--------+-----------+-----------+-----------------+--------------+----------+-------------------+-------------+--------------+------------+-------------------------------+-------------------------------+-------------------------------+--------------+---------+ Serviceの作成 フロント側、サーバ側の2つのServiceを作成していきます。 specについて、ステージにファイルを配置してそれを指定するスタイルのほかに、 以下のようにCREATE SERVICEに含めるスタイルがあるようです。 CREATE SERVICE フロント側のServiceは以下です。 CREATE SERVICE ikuty_api_service IN COMPUTE POOL ikuty_scs_pool FROM SPECIFICATION $$ spec: containers: - name: api-container image: endpoints: - name: api port: 8080 $$ ; サーバ側のServiceは以下です。 CREATE SERVICE ikuty_front_service IN COMPUTE POOL ikuty_scs_pool FROM SPECIFICATION $$ spec: containers: - name: front-container image: endpoints: - name: front port: 5173 public: true $$ ; SERVICE用のシステム関数 SaaSで動くコンテナの動作を確認するのは結構面倒なことなのかなと思います。 自分の操作に対してSaaS側で何が行われているのか知りたいことは結構あるのかなと思います。 SPCSには以下のコマンドがあるようです。 SYSTEM$GET_SERVICE_STATUS SYSTEM$GET_SERVICE_LOGS エンドポイントURLの取得 SHOW ENDPOINTS すると、Specで指定したpublicなendpointを得られました。 ingress_url に なんとかかんとか.snowflakecomputing.app というURLが入っています。 SHOW ENDPOINTS 動作確認 Computing poolのStatusがACTIVEになってから、エンドポイントURLをブラウザで開くと、 期待通り、Reactで作ったHello Worldアプリが表示されます。 SYSEM$GET_SERVICE_LOGS()でフロントサービスのログを覗くと、viteの起動ログが出ていました。 そうえいば 5173 を 80 に mapping する記述をどこにもしていないのですが、そうなっています。 > front@0.0.0 dev > vite VITE v4.5.2 ready in 314 ms ➜ Local: http://localhost:5173/ ➜ Network: http://10.244.1.3:5173/ ➜ Network: http://172.16.0.6:5173/ 同様に、サーバ側のログを覗くと、uvicornの起動ログが出ていました。 ViteのProxyから8080で繋がるので、こちらは8080が開いています。 INFO: Will watch for changes in these directories: [\'/app\'] INFO: Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit) INFO: Started reloader process [1] using StatReload INFO: Started server process [8] INFO: Waiting for application startup. INFO: Application startup complete. まとめ SnowflakeをWebサーバのインフラにするだけの内容で正直意味がないです。 しかし、APIでSnowflakeに触ったり、Reactで格好良い可視化をしたり、夢は広がります。 FastAPI,React,TypeScriptの恩恵ゼロなので、今後ちょっと凝ったものを作ってみます。

default eye-catch image.

Azure Queue StorageとAzure Service Busを比較してみた

Azureで非同期処理を実装する必要があり、Queue StorageとService Busを比較しました。 メッセージの順序保証と消失回避をガチで追求するService Busは考慮すべき事項が大量にあり、 そういう要件がないのであればQueue Storageを使うと楽になることが分かりました。 [arst_toc tag=\"h4\"] 非同期要求-応答パターン アーキテクチャセンターという場所で、Azure全体で検討すべき設計パターンが公開されています。 その中に、非同期要求-応答パターンという項目があります。Azure Functionのドキュメントの中でも、 タイムアウトしそうな長時間処理を実装するならこのパターンにすると良いよ、と説明されています。 HTTPトリガによってキューイングサービスであるAzure Service Busにキュー登録し、 Azure Service Busトリガによって裏で非同期処理を実行する、というものですね。 非同期要求-応答パターン - アーキテクチャセンター シーケンス図がわかりやすいです。 シーケンス図 アプリケーションとジョブ実行機能を疎結合にしたい、という意図がチラチラ見えます。 実際、そこまで疎結合しないのであれば、アプリケーションに統合してしまう作りもあるかと思います。 密結合で良ければ、キュー登録もステータス監視もアプリケーションでやれますので大分シンプルです。 キュー登録、状態監視は非同期処理のコアではなく、コアは以下なのだろうと思います。 キューに積む側が即時応答できること 積まれたトリガで開始する重い処理を分離できること Service BusとQueue Service Azureにはキューを実現する仕組みが2つあります。 厳密にはキューイングサービスはService Busだけですが、ストレージアカウントの1機能である Queue Storageが\"キューそのもの\"であって、よりシンプルにキュー機能を作ることができます。 Azure Queue Storage Service Bus Service Busは、メッセージを決して消失させないガチのキューイングサービスで、 それを実現するために考慮すべき点が大量にあります。順序や消失の対応に命をかけるのでなければ 単なるストレージであるQueue Storageを使った方が気楽です。 両者の機能比較 Queue StorageとService Busの違いについて、公式ドキュメントから拾い集めて表にしました。 ちょっと理解が難しい部分があったので、だいぶ憶測と妄想で補足しています。 (下の方は読み取れず諦めたところがあります..) もともと全然違うサービスなのに比較しようとするから、表の対応が取れない、という問題があります。 公式を読んでもいまいち対応が取れないのは、そもそも機能が違うから、なのだろうと思います。 Storageキュー Service Busキュー 特徴 シンプル。ストレージ容量で課金。単一の送信先(非Pub/Sub)。トランザクション無し。メッセージ内容の更新可。重複検出無し。 多機能。トランザクション有り。1対1に加え多対多(Pub/Sub)が可。メッセージ内容の更新不可。重複検出あり。トピックフィルタあり。 順序の保証 なし一般にFIFOだが状況によって順序が変わる FIFO、セッションIDによるグルーピング(セッション)と同一セッション内の送信順序保持。 転送・ロック・解決 Peek & Lease受信者から「読み出す(Peek)」要求があった場合に他の受信者に該当のメッセージ読み取らせないようにする。(Lease) Peek/Lockモード送信者がブローカー(Service Bus)に送信し受信者の受信が成功/失敗分かって初めて解決とする。メッセージにロックがかかり競合受信者が触れなくなる。送りっぱなし・非同期にしてはいけない。Receive/Deleteモードブローカーが受信者に送信した時点で解決とする。受信者による受信の成功/失敗には関与しない。受信に失敗するとメッセージは失われる。 配信不能キュー(DLQ) 有害キュー。対応する記事がないが、受信に失敗すると特殊なキューが作られてそこに入った。 配信不能レタリング 配信保証 At-Least-Once少なくとも1回。つまり1回は確実に配信されるが、重複(2回以上同じメッセージ)がありうる。 At-Least-Once(PeekLockの場合)少なくとも1回。つまり1回は確実に配信されるが、重複(2回以上同じメッセージ)がありうる、損失なしAt-Most-Once(ReceiveAndDeleteの場合)最大1回。つまり0回=配信されないことがある。重複はない。 トランザクション 非対応 対応 重複検出 非対応 対応 プロトコル HTTP/HTTPS (RESTベース) HTTPS (RESTベース) メッセージサイズ 最大64KB 256 KB または 100 MB メッセージの最大TTL 無限 有限(TimeSpan.MaxValue??) 処理数 最大2000メッセージ/秒 (1KBの場合) (省略) キューサイズ 最大500TB 1 GB ~ 80 GB キューの最大数 無限 10,000 メッセージ保存期間 最大7日 (省略) どちらを使うべきか 公式では、以下が必要な場合はService Busを使うべしとされています。 不要ならStorage Queueとなります。だいたい、あるなら使いたいとなりがちな気がしますが、 そうするとService Busになってしまいます。 扱いやすいStorage Queueを使うなら「必要ではない」を判断する必要があります。 メッセージセッション(FIFO) トランザクション 重複検出 自動的な配信不能レタリング Pub/Sub azure-storage-queueの動作確認 Azure Storage Queueを操作するパッケージは、azure-storage-queueです。 クイックリファレンスによると、以下の機能をサポートしています。 キューを作成する メッセージをキューに追加する キュー内のメッセージを表示する キュー内のメッセージを更新する キューの長さを取得する キューからメッセージを受信する キューからメッセージを削除する キューを削除する 対して、Service Busを操作するパッケージは、azure-servicebusです。 azure-storage-queueのように簡単に機能リストをまとめることはできないため省略。 抽象化のレベルが違うので、これだけでも面倒さが分かります。 まとめ Storage QueueとService Busを比較し、まとめてみました。 次回はFunctionsのStorage Queueトリガ/バインドを使って非同期処理を書いてみます。

default eye-catch image.

Azure Functionsの機能まとめ(座学版)

タイトルの通り、Azure Functionsの機能をまとめてみた。 [arst_toc tag=\"h4\"] 課金モデル 課金モデルが5パターンあるのではなく、運用方式が5パターンあり、それぞれ課金方式が違う。 呼称がMECEでなかったり公式ドキュメントで表記揺れが存在したり親切でない点はある。 Premium、DedicatedはApp Service Planで動かすことができ、かなり微妙に繋がっている。 実質的にPremium、DedicatedはApp Service Planで実現され課金がかかる。 コールドスタートに対する改善の歴史を感じる。 課金モデル 概要 従量課金 オーソドックスなFaaS。名前の通り資源の使用量に応じて課金。必要最低限のネットワーク分離が提供される。既存VNetとの統合は不可。コールドスタート。アプリのロード・アンロードが頻繁に発生し、しばしば遅い。 Premium 資源の使用量に応じて課金。従量課金よりも高機能な従量課金(言葉辛い..)。既存VNetとの統合がサポートされる。コールドスタートを回避するために用意された。インスタンス数をゼロまでスケールインさせないことでホットスタートを実現している。アクティブなインスタンスのコア数(vCPU/h)、メモリ使用量(GB/h)に課金。裏側はApp Service Planだが手持ちのカスタムイメージをACRに登録しApp Serviceにホストすることが可能。 Dedicated 通常のApp Service Planとして課金される。既にApp Serviceインスタンスを実行しており新たにFunctionを相乗りさせる時に使用する。従量課金的な要素が無いので(高価だけれども)コストを予測できる。 App Service Environment(ASE) 超強力なDedicated。1人の顧客に限定された専用環境。ASE v1,v2,v3と脈々と新しい奴が作られている。高スケール、分離およびセキュリティで保護されたネットワーク アクセス、高いメモリ使用率などが書かれている。マルチリージョンにまたがって構成できる。高RPS(Requests per Seconds)ワークロード向けに用意されるApp Serviceの強化版。 Container Apps Hosting Azure Container Appsでコンテナ化されたFunctionsの開発・デプロイ・管理。Kubernetes ベースの環境で関数を実行できる。現在プレビュー。 従量課金とPremiumの違い リッチな従量課金プランであるPremiumについて詳細なドキュメントがある。 Azure Functions の Premium プラン そのメリットとして、以下が列挙されている。 インスタンスをウォーム状態に維持することでコールド スタートを回避します 仮想ネットワーク接続 より長いランタイム期間をサポートします Premium インスタンス サイズの選択 従量課金プランと比較して、予測可能な料金 複数の Function App を含むプランでの高密度アプリ割り当て 従量課金プランは、インスタンス数をゼロまでスケールインできる。 その結果としてその料金の料金はかからない一方、リクエストが来たときにゼロから1個以上まで スケールアウトする際に\"コールドスタート\"時間を要する。 Premiumプランには、\"常時使用可能なインスタンス\"という考え方がある。 要はインスタンス数をゼロまでスケールインさせず、常にアクティブにしておくということらしい。 当然、\"常時使用可能なインスタンス\"は常時課金される。 他に\"事前ウォーミング可能なインスタンス\"という考え方がある。 常時使用可能なインスタンスが負荷分散してリクエストを捌いている間、 事前ウォーミング可能なインスタンスが後で立ち上がる。常時使用可能なインスタンスの負荷が 規定値を超えると、事前ウォーミング可能なインスタンスがアクティブに昇格し捌き始める。 事前ウォーミング可能なインスタンスは昇格するまでの間立派に課金されてしまう。 Premiumプランは実際はApp Serviceの仕組みで動く。 プラン名に規約がありEで始めるとElastic Premium、つまり、App Serviceで動かすPremiumということになる。また、Pで始めると動的スケールしないDedicated Hostingプランということになる。 Azure Functions は Azure App Service プラットフォームで実行できます。 App Service プラットフォームでは、Premium プラン関数アプリをホストするプランは Elastic Premium プランと呼ばれており、EP1 のような SKU 名があります。 Premium プランで関数アプリを実行することを選択した場合、EP1 のように \"E\" で始まる SKU 名を持つプランを必ず作成してください。 P1V2 (Premium V2 Small プラン) のように \"P\" で始まる App Service プラン SKU 名は実際には Dedicated ホスティング プランです。 Dedicated であり、Elastic Premium ではないため、\"P\" で始まる SKU 名のプランは動的にスケールせず、コストが増えることがあります。 実行継続時間 従量課金プランは1回の実行の最大は10分。Premiumプランはデフォルトで最大30分。 ただし、Premiumプランの最大値は延長して無制限まで拡張できる。 プラットフォームのアップグレードにより、マネージド シャットダウンがトリガーされ、関数の実行が停止する可能性があります プラットフォームの停止により、処理されないシャットダウンが発生し、関数の実行が停止する可能性があります 新しい実行がない状態で 60 分経つと worker を停止するアイドル タイマーがあります スケールイン動作により、60 分後に worker のシャットダウンが発生する可能性があります スロットのスワップにより、スワップ中にソース スロットとターゲット スロットの実行が終了される可能性があります これはFunctionのタイムアウト期間であって、HTTPトリガーの応答にはAzure Load Balancerの タイムアウト期間(=230秒)が適用される。HTTPトリガで長時間処理を実現する場合、 Durable Functionで作るか、即時応答・非同期処理のパターンにすべきとのこと。 Function App タイムアウト期間 Durable Functions とは 実行時間の長い関数を使用しない HTTPトリガで長時間処理を実装するパターン 可能な限り、大きな関数は、連携して高速な応答を返す、より小さな関数セットにリファクタリングしてください。 たとえば、webhook または HTTP トリガー関数では、一定の時間内に確認応答が必要になる場合があります。webhook は通常、即座に応答を必要とします。 この HTTP トリガー ペイロードは、キュー トリガー関数によって処理されるキューに渡すことができます。 このアプローチを使用すると、実際の作業を遅らせて、即座に応答を返すことができます。 ネットワーク 既存のAzureリソースとAzure Functionsを連携する際に、どのように既存リソースと連携できるか、 各実現方式毎にやれることが決まっている。以下が参考になった。 Azure Functions のネットワーク オプション 特徴 従量課金 Premium Dedicated ASE 受信アクセス制限 ✅ ✅ ✅ ✅ プライベートエンドポイント ❌ ✅ ✅ ✅ 仮想ネットワークの統合 ❌ ✅ ✅ ✅ VNet Trigger(非HTTP) ❌ ✅ ✅ ✅ Hybrid接続 ❌ ✅ ✅ ✅ 送信IPの制限 ❌ ✅ ✅ ✅ 受信アクセス制限は、送信元のIPアドレスに対するAllow/Denyを設定する機能。 IPv4/v6のIPアドレスを直接指定するか、サービスエンドポイントを使用するVNetのサブネットを指定可。 より詳細な記述は、Azure App Service のアクセス制限を設定するを参照。 プライベートエンドポイントは、VNet内からプライベートエンドポイントを介したPrivateLink接続。 AWS VPCと異なり、Azure VNetはリソースの論理的なグルーピングに過ぎない、という側面があり、 通信を秘匿化したいという文脈でなくても、PrivateLinkを使って連携せざるを得ない事情がある。 プライベートエンドポイントのDNSはAzureが良しなに作ってくれる。 仮想ネットワークの統合(VNet統合)は、Azure Functionsを指定のVNetに論理的に配置するオプション。 これにより、FunctionからVNet内のリソースにアクセスできるようになる。 FunctionからVNet内リソースに対して送信呼び出しを行うために使われる。逆には使われない。 従量課金ではN.G.だがPremiumクラスの従量課金なら可能になる。これはメリット。 リージョン内であれば、VNet側にVirtual Network Gatewayは必要ないがリージョン間であれば必要。 Virtual Network Gatewayを必要とする場合、通信に大きな制約がかかる。 なお、Azure FunctionsをASEで運用する場合、FunctionはASE内に物理的に配置されるため、 論理的なVNet統合を行う必要はないとのこと。 トリガについては後述する。オーソドックスな従量課金モデルはHTTPトリガしかサポートしない。 Premium以降で他のトリガが解放される。 ハイブリッド通信は、Windowsで動作している従量課金以外の全てのFunctionについて、 他のネットワークのリソースにアクセスできる機能。Azure Relayという機能の1つ。 Windowsを使わないといけないため特殊な用途となる。省略。 トリガとバインド トリガーによりFunctionが発火し実行される。つまりトリガーにより関数の呼び出し方法を定義する。 トリガーとバインドについてはAzure Functions でのトリガーとバインドの概念が参考になる。 トリガーにはデータが紐付けられていて、呼び出しの際のペイロードとなる。 バインドとは、別のリソースを宣言的に接続する方法。入力バインド/出力バインドがある。 バインドからのデータは、Functionから見てパラメータとして利用できる。 Azure Functionsのバージョンにより対応可否が異なる。現在のバージョンはv4。 比較的マイナーと思われるものについて、割と昔出来ていたことが出来なくなったパターンが多い。 Kafka、RabbitMQは従量課金プランではサポートされない。 Typev1.xv2.x以降トリガー入力出力 Blob Storage✔✔✔✔✔ Cosmos DB✔✔✔✔✔ Azure Data Explorer✔✔✔ Azure SQL✔✔✔✔ Dapr✔✔✔✔ Event Grid✔✔✔✔ Event Hubs✔✔✔✔ HTTP✔✔✔✔ IoT Hub✔✔✔ Kafka✔✔✔ Mobile Apps✔✔✔ Notification Hubs✔✔ Queue Storage✔✔✔✔ Redis✔✔ Rabbit MQ✔✔✔ SendGrid✔✔✔ Service Bus✔✔✔✔ SignalR✔✔✔✔ Table Storage✔✔✔✔ Timer✔✔✔ Twillo✔✔✔ 例えば、HTTPトリガーとバインドの例は以下。 RESTfulAPI的にURLにペイロードを含めることができる。 (ドキュメントを見ても何が正解が分からないし、もちろんどこかに実行例がある訳でもない) ここで、リクエストパラメタが入力バインド、レスポンスが出力バインド、ということになる..(のかな)。 import logging import azure.functions as func @app.function_name(name=\"httpTrigger\") @app.route(route=\"products/{category:alpha}/{id:int?}\" auth_level=func.AuthLevel.ANONYMOUS) def main(req: func.HttpRequest) -> func.HttpResponse: category = req.route_params.get(\'category\') id = req.route_params.get(\'id\') message = f\"Category: {category}, ID: {id}\" return func.HttpResponse(message) こうしておくと、例えば以下のURLで定義したhttpTriggerを実行できる。 http://.azurewebsites.net/api/products/electronics/357 auth_levelは認可レベル。URLのリクエストに必要な認可キーを指定する。 ANNONYMOUSなら不要、FUNCTIONなら関数固有のAPIキー、ADMINならマスターキー(?)。 詳細はこちら。 まとめ Azureドキュメントを見ながらAzure Functionの概要をまとめてみた。 実装例が少なくまとまったドキュメントが少ない、という問題があり、 座学版の他に「やってみた」を繰り返す必要がありそう。

default eye-catch image.

External Network Accessを使ってSnowflakeとFitbitAPIを繋いでみた話

FitbitはAPIがしっかり整備されていて、OAuth2 endpoint経由でデータが取り放題。 せっかくなので、話題のExternal Network Access(2023年12月現在 PuPr)を試してみようと思う。 つまり、FitbitAPI→Snowflakeをやってみようと思う。 Fitbit APIを使用するにはOAuth2.0 Authorizationを通す必要がある。 Snowflakeの公式にOAuth2.0 Endpoint経由でGoogle翻訳APIと連携する段取りが書かれていて、 それをそのままFitbit APIのものに差し替えるだけで動いた。 外部ネットワークアクセスの例 外部ネットワークアクセスについては以下。 [clink implicit=\"false\" url=\"https://docs.snowflake.com/ja/developer-guide/external-network-access/creating-using-external-network-access#label-creating-using-external-access-integration-network-rule\" imgurl=\"https://www.snowflake.com/wp-content/uploads/2017/01/snowflake-logo-color-300x69.png\" title=\"外部アクセス統合の作成と使用\" excerpt=\"特定の外部ネットワークロケーションへのアクセスを有効にするには、外部ロケーションのリストと使用を許可されるシークレットのリストを指定する外部アクセス統合を作成します。UDF の作成時、あるいは CREATEFUNCTION または CREATEPROCEDURE でプロシージャを作成する際に、 EXTERNAL_ACCESS_INTEGRATIONS 句を使用してこの統合を参照することで、ハンドラーコードが外部ロケーションとの認証コードにシークレットを使用できるようになります。\"] 2016年6月に書いた記事。phpで検証をしていた。 この辺りからバッテリーがダメになる度に新しいFitbit Charge(1,2,3)を買って溜めてきた。 この間、FitbitがGoogleに買われてしまったり、スマホアプリが大幅に変わったり、色々あった。 基本的な機能はずっと動いているので、7年分のデータが溜まっているんじゃないかな、と期待。 [clink url=\"https://ikuty.com/2016/06/07/fitbitapi-authenticate-grant-flow/\"] Fitbit API側の準備 OAuth2連携に必要な情報を dev.fitbit.com から取得する必要がある。 Authorization Code Grant Flow with PKCE こちらを参考にさせていただいた。 [clink implicit=\"false\" url=\"https://www.zenryoku-kun.com/post/fitbit-api#register-app\" imgurl=\"https://www.zenryoku-kun.com/home/sakura-400w.jpg\" title=\"FitbitのWeb APIを実行する方法\" excerpt=\"Fitbit Sense2を購入しました。はじめてのスマートウォッチです。Fitbitデバイスでは、心拍数や歩数等、収集したデータをWeb APIで取得することが可能です。さっそく使って遊んでみようと思ったら、Web APIの認証がなかなか通らない、、、ドキュメントはとても充実しているのですが、OAuth2.0の認証パターンがImplicit Grant Flowの場合、Authorization Code Grant Flowの場合、PKCEを使う場合、、、などなど、情報量がとにかく多く混乱してしまいました。何はともあれ、何とか認証を通して、こんな感じで歩数などのアクティビティ情報や、心拍数や血中酸素濃度(SpO2)を取得することが出来ました。\"] 以下を準備すればOK。 access-token refresh-token client-id Snowflakeでリソース作り Snowsightでポチポチとリソースを作っていく。 USE ROLE SYSADMIN; -- 外部ロケーションを表すネットワークルールの作成 -- CREATE OR REPLACE NETWORK RULE fitbit_apis_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = (\'api.fitbit.com\'); -- 外部ロケーションとの認証に必要なOAuth認証情報を保持するセキュリティ統合の作成 -- CREATE OR REPLACE SECURITY INTEGRATION fitbit_api_oauth TYPE = API_AUTHENTICATION AUTH_TYPE = OAUTH2 OAUTH_CLIENT_ID = \'\' OAUTH_CLIENT_SECRET = \'\' OAUTH_TOKEN_ENDPOINT = \'https://api.fitbit.com/oauth2/token\' OAUTH_AUTHORIZATION_ENDPOINT = \'https://www.fitbit.com/oauth2/authorize\' ENABLED = TRUE; -- セキュリティ統合に含まれる認証情報を表すシークレットの作成 -- CREATE OR REPLACE SECRET fitbit_api_oauth_token TYPE = oauth2 API_AUTHENTICATION = fitbit_api_oauth OAUTH_REFRESH_TOKEN = \'\'; 最後に外部アクセス統合を作成する。 ストレージ統合や、Notification統合など、統合の作成にはACCOUNTADMINが必要で、 同様に外部アクセス統合の作成にはACCOUNTADMINが必要とのこと。 USE ROLE ACCOUNTADMIN; CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION fitbit_apis_access_integration ALLOWED_NETWORK_RULES = (fitbit_apis_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (fitbit_api_oauth_token) ENABLED = TRUE; 外部ロケーション(ネットワーク)にアクセスするUDFsを書くロールを作成する。 UDFsを書く際に、シークレットを参照する必要がある。 UDFsを書けるロールにシークレットのREAD権限を付与しておく必要がある。 以下、そのままでは SECURITYADMINがDB・スキーマに触れないので環境により修正が必要。 USE ROLE USERADMIN; CREATE OR REPLACE ROLE ikuty_fitbitapi_developer; USE ROLE SECURITYADMIN; USE SCHEMA IKUTY_DB.PUBLIC; GRANT READ ON SECRET IKUTY_DB.PUBLIC.fitbit_api_oauth_token TO ROLE ikuty_fitbitapi_developer; GRANT USAGE ON INTEGRATION fitbit_apis_access_integration TO ROLE ikuty_fitbitapi_developer; GRANT ROLE ikuty_fitbitapi_developer TO role SYSADMIN; 本体の実装 PythonでOAuth2 Endpoint経由でFitbit APIにGETリクエストを投げるFunctionを書く。 最初、トークンのexpire時のrefreshを自力で書いていたが、get_oauth_access_token(\'cred\')により、 自動的にrefreshしてくれていることに気づいた。 use role sysadmin; use schema IKUTY_DB.PUBLIC; CREATE OR REPLACE FUNCTION fitbit_python() RETURNS STRING LANGUAGE PYTHON RUNTIME_VERSION = 3.8 HANDLER = \'hello_fitbit\' EXTERNAL_ACCESS_INTEGRATIONS = (fitbit_apis_access_integration) PACKAGES = (\'snowflake-snowpark-python\',\'requests\') SECRETS = (\'cred\' = fitbit_api_oauth_token ) AS $$ import _snowflake import requests import json def hello_fitbit(): with requests.Session() as s: access_token = _snowflake.get_oauth_access_token(\'cred\') url = \"https://api.fitbit.com/1/user/-/activities/steps/date/today/1m.json\" res = s.get(url,headers={\"Authorization\": \"Bearer \" + access_token}) res_data = res.json() return res_data $$; 実行結果は以下。1日毎の歩数を1ヶ月分取得できた(恥...)。 select parse_json(fitbit_python()); { \"activities-steps\": [ { \"dateTime\": \"2023-11-23\", \"value\": \"15570\" }, { \"dateTime\": \"2023-11-24\", \"value\": \"5392\" }, { \"dateTime\": \"2023-11-25\", \"value\": \"8993\" }, { \"dateTime\": \"2023-11-26\", \"value\": \"10525\" }, { \"dateTime\": \"2023-11-27\", \"value\": \"6371\" }, { \"dateTime\": \"2023-11-28\", \"value\": \"2713\" }, { \"dateTime\": \"2023-11-29\", \"value\": \"9252\" }, { \"dateTime\": \"2023-11-30\", \"value\": \"0\" }, { \"dateTime\": \"2023-12-01\", \"value\": \"7947\" }, { \"dateTime\": \"2023-12-02\", \"value\": \"11265\" }, { \"dateTime\": \"2023-12-03\", \"value\": \"8557\" }, { \"dateTime\": \"2023-12-04\", \"value\": \"2366\" }, { \"dateTime\": \"2023-12-05\", \"value\": \"7985\" }, { \"dateTime\": \"2023-12-06\", \"value\": \"8109\" }, { \"dateTime\": \"2023-12-07\", \"value\": \"6852\" }, { \"dateTime\": \"2023-12-08\", \"value\": \"3707\" }, { \"dateTime\": \"2023-12-09\", \"value\": \"12640\" }, { \"dateTime\": \"2023-12-10\", \"value\": \"7122\" }, { \"dateTime\": \"2023-12-11\", \"value\": \"7190\" }, { \"dateTime\": \"2023-12-12\", \"value\": \"8034\" }, { \"dateTime\": \"2023-12-13\", \"value\": \"5228\" }, { \"dateTime\": \"2023-12-14\", \"value\": \"2861\" }, { \"dateTime\": \"2023-12-15\", \"value\": \"6785\" }, { \"dateTime\": \"2023-12-16\", \"value\": \"11720\" }, { \"dateTime\": \"2023-12-17\", \"value\": \"11021\" }, { \"dateTime\": \"2023-12-18\", \"value\": \"0\" }, { \"dateTime\": \"2023-12-19\", \"value\": \"11021\" }, { \"dateTime\": \"2023-12-20\", \"value\": \"0\" }, { \"dateTime\": \"2023-12-21\", \"value\": \"2703\" }, { \"dateTime\": \"2023-12-22\", \"value\": \"3336\" }, { \"dateTime\": \"2023-12-23\", \"value\": \"7497\" } ] } 結論 PuPrのExternal Network Accessを使用して、FitbitAPI→Snowflakeが出来ることを確認した。 (途中、自動的にトークンをrefreshしてくれている、と書いたが、何度かExpireさせないと良くわからない。) 相手がOAuth2.0ならとても簡単に繋ぐことができると思う。 次は、せっかくなのでSiS(Streamlit in Snowflake)で可視化してみたりしたい。

default eye-catch image.

Deep dive into the internals of Snowflake Virtual Warehousesを読んでみた

この記事はSnowflake Advent Calendar 2023シリーズ2の19日目です。 今年はSnowProAdvanced: Architect試験に合格できました。 結局のところ資格試験であるという側面はあるものの、いろいろ役立っている実感があります。 その後、Mediumというメディアで気になる記事を読み漁る、みたいなことを始めました。 正直知らないことばかりです..。 いくつか読んだ記事のうち、これはヤバいなと感じた記事の読書感想文を書こうと思います。 [clink implicit=\"false\" url=\"https://medium.com/snowflake/deep-dive-into-the-internals-of-snowflake-virtual-warehouses-d6d9676127d2\" imgurl=\"https://miro.medium.com/v2/resize:fit:1002/format:webp/0*6KqDj8Y_HxeL11xT.png\" title=\"Deep dive into the internals of Snowflake Virtual Warehouses\" excerpt=\"Snowflake’s Data Cloud provided as Software-as-a-Service (SaaS), enables data storage, processing, analytic solutions, Machine Learning, and running apps & services in a performant, easy-to-use, and flexible manner using “virtual warehouses” which is the primary compute primitive in Snowflake. This post details the internals of virtual warehouses which provide elastic, highly available, and fully managed mechanisms to run a variety of customer workloads on multiple cloud service providers.\"] 訳は間違っているところもあると思います。ご容赦ください。 [arst_toc tag=\"h4\"] 仮想ウェアハウスの基本 まず、コンピュートとストレージが分離し、それぞれ独立してスケールできることが特徴としている。 Snowflakeにおいて、仮想ウェアハウスはコンピュートの最小単位ではあるが、仮想ウェアハウスは 複数のVMからなるMPPクラスタであると言及している。 この記事は、仮想ウェアハウスを説明するために仮想ウェアハウスを構成するVMに言及している。 仮想ウェアハウスの下に物理のVMがいることにフォーカスがあてられている。 SnowflakeのSaaSサービスを実現するコードはMPPクラスタを構成する各VMで動いていて、 ジョブ実行の際、各VMはリソースを直接参照するしVM同士でmeshN/Wを構成して資源を共有する。 (後述) 仮想ウェアハウス同士はストレージを共有しないけれど、仮想ウェアハウス内部のVMは むちゃくちゃ密に連携しあって、計算資源もストレージも共有しあう。 このセクションで、仮想ウェアハウスの設計方針が述べられている。 「可能な限り顧客に選択肢を提供するのを避けSnowflakeがベストを考える」が基本方針である一方、 「仮想ウェアハウスを構成するVMの物理資源を変更できる柔軟性を提供する」と言っている。 以降、仮想ウェアハウスを構成するVMの振る舞いについて書かれている 仮想ウェアハウスのサイズとタイプ 仮想ウェアハウスのタイプはCPUとメモリの比率、サイズはCPUとメモリの総量を決める。 タイプは、StandardとSnorpark-optimizedの2種類。 Snowpark-optimizedは、Standardの16倍のメモリ量と10倍のSSDを持つ。 メモリ増量により計算が高速化する。ストレージが大きいとキャッシュや中間生成物が 後続の実行で再利用され高速化する。 中間生成物の書き込みに対し、第1に仮想ウェアハウス上のVMのメモリが使われる。 メモリを使い切ったとき、VMのローカルSSDが使われる。 SSDも使い切ったとき、S3等のリモートストレージが使われる。 QUERY_HISTORY viewにSSD、リモートストレージにスピルした量を出力するので、 メモリが溢れないようにするか、少なくともSSDには乗るようにサイズを増やせよ、と言っている。 (やはりストーリーがストレートでわかりやすい..) SELECT QUERY_ID ,USER_NAME ,WAREHOUSE_NAME ,WAREHOUSE_SIZE ,BYTES_SCANNED ,BYTES_SPILLED_TO_REMOTE_STORAGE ,BYTES_SPILLED_TO_REMOTE_STORAGE / BYTES_SCANNED AS SPILLING_READ_RATIO FROM \"SNOWFLAKE\".\"ACCOUNT_USAGE\".\"QUERY_HISTORY\" WHERE BYTES_SPILLED_TO_REMOTE_STORAGE > BYTES_SCANNED * 5 - Each byte read was spilled 5x on average ORDER BY SPILLING_READ_RATIO DESC ; マルチクラスタ ウェアハウス マルチクラスタは、ジョブの同時実行性を高めるためにクラスタを静的/動的に追加する仕組み。 クラスタ内のVMは相互に関係し合いリソース共有して複数台でジョブのオフロードを行うため、 単一クエリのパフォーマンスアップに寄与する。一方で、クラスタ間はリソース共有しないため、 増えたクラスタ内のVMはジョブのオフロード先の融通にはならず、同時実行時の性能劣化予防に働く。 他にスケーリングポリシーの話や、Min/Max設定による静的/動的追加の話が書かれているが省略。 UpではなくOutの方が費用対効果が高い例として、interleaved workloadsが挙げられている。 Outで増やしたクラスタがダラダラと回り続けるケースが除外できず理論値ではあるけれども、 Upに対するOutのメリットを言う場合に説明しやすい図だなと思った。 この辺りモヤモヤしていたのでバシっと説明してもらえて助かりました。 柔軟性-ステートレスなスケーリング 需給調整の文脈ではなく、自動起動と自動サスペンドの文脈で仮想ウェアハウスの状態が書かれている。 リソースがステートレスであれば、需要の増減と関係なくリソースを増減できる。 仮想ウェアハウスはステートレスリソースであって、需要の発生によりプロビジョンングされ、 需要の消滅により仮想ウェアハウスに紐づくリソースが破棄される。 仮想ウェアハウスにジョブが送信されると、クラスタ内のVMはジョブ実行中にのみ存続するプロセスを 生成する。プロセスが失敗した場合、自動的に再試行される。 ユーザとウェアハウスは多対多の関係であり、ウェアハウスから見ると同時に複数の需要が発生する。 異なる組織・部署がウェアハウスを使用するケースにおいて、ウェアハウスは同時にそれぞれを処理する。 各々のウェアハウスは同じ共有テーブルにアクセスできるが、その際、データのコピーをウェアハウス内に 持たなくても良いように作られているので、各組織・部署の処理が他の組織・部署に暴露されるリスクを 回避できるようになっている。 異なる組織・部署が実行したジョブがウェアハウス上で相互作用しない、という事実があり、 組織・部署から見れば、他の組織・部署に全く影響されず自由にウェアハウスを利用できるという 書き方になっていて、ちょっと抽象度が高いですが「ステートレス」が説明されていました。 柔軟性-マルチクラスタ オートスケーリング スケーリングポリシーの説明。 スケーリングポリシーの設定により、各クラスタの自動起動・シャットダウンの相対的な速度を制御する。 スタンダードポリシーはクレジット消費削減よりもクラスタ追加を優先し、クエリ所要時間を最小化する。 エコノミーポリシーの設定により、クラスタを追加するよりも現在実行中のクラスタを全開で回すことが 優先され、結果としてクエリがキューに入りやすくなり所要時間が延びるが、クレジット消費は減る。 この説明は公式通り。 柔軟性-ゼロへのスケール Auto-resumeとAuto-suspendの説明。 ウェアハウスに対する需要がなくなって一定期間経ったら自動的に停止する。 ウェアハウスに対する需要が発生したら自動的に再開する。その時間等を調整できる。 これらの設定はクラスタではなくウェアハウスに対して設定する。 これも説明は公式通り。需要がなくなったら1個も起動していない状態にできることが主張ポイント 柔軟性-自動Suspend期間の管理 Suspendは、つまり仮想ウェアハウスを構成するVMのリリースなので、VMが持つSSDに 蓄えられたキャッシュは同時に破棄されてしまう。これは、後続のジョブが発生したときに クエリ結果キャッシュが効かなくなることに繋がる。 公式の通り、「ウェアハウス稼働時間(クレジット消費)」と「クエリパフォーマンス」がトレードオフの 関係となる。需要がなくなってすぐにウェアハウスを止めると確かにクレジット消費は減るが、 キャッシュヒット率が下がる。トレードオフにSweet spotがあるので探しましょうと書かれている。 これに留まらず、どういう風に決めたら良いかガイドが書かれている。 ただ、これは答えが無い問題で、実験してねとも書いてある。 - タスク実行、ロード、ETL/ELTユースケースにおいて、すぐに止めた方が良い。 - BI等SELECTが起きるユースケースは、止めるまで10分待つべき。 - DevOps,DataOps,Data Scienceのユースケースは、停止時間は5分が最適。 とりあえず、タスク実行、ロードでは、自動Suspend期間を持たせる意味はないので、 そこは、バッサリ最速で落とす勇気が出る書き方で参考になりました。 全てのクエリのうち、SSDからスキャンした割合を集計するクエリは以下。 この割合が低いということは、ウェアハウスのSuspendが早すぎることを示している。 SELECT WAREHOUSE_NAME ,COUNT(*) AS QUERY_COUNT ,SUM(BYTES_SCANNED) AS BYTES_SCANNED ,SUM(BYTES_SCANNED*PERCENTAGE_SCANNED_FROM_CACHE) AS BYTES_SCANNED_FROM_CACHE ,SUM(BYTES_SCANNED*PERCENTAGE_SCANNED_FROM_CACHE) / SUM(BYTES_SCANNED) AS PERCENT_SCANNED_FROM_CACHE FROM \"SNOWFLAKE\".\"ACCOUNT_USAGE\".\"QUERY_HISTORY\" WHERE START_TIME >= dateadd(month,-1,current_timestamp()) AND BYTES_SCANNED > 0 GROUP BY 1 ORDER BY 5 ; 柔軟性-ウェアハウス内のVMは起動済みVMのプールから割り当てられる VMをコールドから起動するには10秒オーダーの時間がかかる。そもそも小規模のクラウドサービスでは VMの数が不足して流動性がない場合もあり、起動済みのVMをプールして再利用することで、 これらの問題を解決しようとしている。 Snowflakeは、VMの起動、終了、停止、再開、スケーリング等のオペレーション時間に対して、 内部でサービスレベル目標を設けている。 (これらの時間がサービスレベル目標から外れるとSnowflake内部でインシデント管理されるらしい。) ユーザのリクエストで需要が発生した場合、起動済みVMのプールからVMが選ばれ、 ウェアハウスに割り当てられる。 起動済みVMのプールのサイズは、過去の需要のベースラインとスパイクから予測されているらしい。 確かにウェアハウスが瞬時に起動する仕組みが気にはなっていました。 妥当な仕組みで成立しているようですが、言及されている点がポイントかと思います。 柔軟性-需要のバーストに対して用意されるQAS サイズアップの他にQAS(Query Acceleration Service)というサービスが存在する。 起動済みVMプールにあるVMを需給に応じて自動的にウェアハウスに組み入れる。 ウェアハウス内でVMは密に連携してクエリをオフロードし合う。 動的なサイズアップであって、疎連携のマルチクラスタとは異なる。 QASは主に、巨大なテーブルのScanや、burstyなワークロードを目的とする。 QASを使用すると、大規模なクエリが検知された場合にウェアハウス内のVMが ウェアハウスから離れ、他のユーザの小規模なクエリに使われるらしい。 通常はウェアハウスのサイズアップよりも低いコストで目的を達成できるそう。 この手の機能が何故ワークロードを高速化するのか、結局のところ中身を知らないとわからないと 思うので、機能の説明の他に、どういう作りなのかを書いてくれるととても参考になる気がする。 When to useはburstyなワークロードということ。 QASで恩恵を受けられるクエリがどれぐらいあるか気になるところ。 公式によると以下の特徴を持つクエリはQASの恩恵を得られないそう。 フィルターや集計(つまり、 GROUP BY)がない。Query Acceleration Serviceは現在、このようなクエリを高速化できません。 フィルターの選択性が十分ではない。または、 GROUP BY 式のカーディナリティが高くなっている。 十分なパーティションがない。スキャンするために十分なパーティションがないと、クエリアクセラレーションの利点は、サービス用に追加のサーバーを取得する際の待機時間によって相殺されます。 クエリに LIMIT 句が含まれている。ただし、 ORDER BY 句を含んでいる LIMIT 句はサポート されます。 QASの恩恵を得られるクエリとウェアハウスは以下のビューから探すことができる。 -- アクセラレーションの対象となるクエリ実行時間の量によって、 -- サービスから最もメリットを受ける可能性のあるクエリを識別します。 SELECT query_id, eligible_query_acceleration_time FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_ACCELERATION_ELIGIBLE ORDER BY eligible_query_acceleration_time DESC; -- Query Acceleration Serviceの特定の期間中、 -- 対象となるクエリが最も多いウェアハウスを識別します。 SELECT query_id, eligible_query_acceleration_time FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_ACCELERATION_ELIGIBLE WHERE warehouse_name = \'mywh\' ORDER BY eligible_query_acceleration_time DESC; QASにより、ウェアハウスは需給調整のためにVMをリース(借りる)する、という表現がある。 ウェアハウスがリースできるVMの数の最大値は、Scale Factorという数値で表される。 要は、通常のウェアハウスサイズで確保するVMの数の何倍のVMをリースできるか。 例えば、Scale Factorが5、VMのサイズがM(つまり4credsits/hour)の場合、 4*5=20 credits/hourまで増強することになる。 Scale FactorはQUERY_ACCELERATION_ELIGIBLEビューにあり、 クエリID単位で知ることができる。 SELECT MAX(upper_limit_scale_factor) FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_ACCELERATION_ELIGIBLE WHERE warehouse_name = \'mywh\'; 仮想ウェアハウスのジョブスケジューリング スループット最大化、レイテンシ最小化、クラスタ使用率最大化、異なる需要に対して供給のために、 ウェアハウスの負荷を追跡・調整するウェアハウススケジューリングサービス(WSS)が備わっていて、 クエリがクラウドサービスレイヤでコンパイルされた後、WSSがジョブスケジューリングを行う。 WSSは各VMのCPU・メモリ使用量を追跡する。ウェアハウスのメモリキャパシティは、 各VMの実効メモリ(OSやソフトウエアの使用を除く)にウェアハウス内のVMの数を掛けたもの。 メモリが使い果たされたことを検知して、データをdiskに吐き出す(Spill)。 メモリ負荷が高くなりすぎると、VMは落とされて\"リタイア\"(前述)する場合がある。 情報科学の用語の1つにDOP(Degree Of Parallelism)がある。 WSSは1個のジョブを何個のプロセスで同時処理して完了するか、という制御を行なっているらしい。 VMのCPUコアが1つのプロセスを受け持ち、CPUコアの数だけプロセスを並列実行できる。 例えばCPUコアを8個もつVMを4個もつウェアハウスの保持コア数は合計32個。 1つのジョブを32コアで並列処理しても良いし、逆に32個のジョブを1コアで処理しても良い。 DOPはコンパイル時に推定される。 以降、ジョブスケジューリングの少し詳しい説明が書かれている。 実行中の各ウェアハウスは既にキューにジョブが積まれている。 その上で新しいジョブを処理する場合、どのウェアハウスで処理すべきかを決めることになる。 WSSはウェアハウスの全てのVMに均等に負荷分散されるべき、という仮定を立てる。 クラウドサービスレイヤは、ジョブの処理に必要なメモリとコンパイル時に決まったDOPから、 そのジョブをどのウェアハウスで処理するかを決める。 メモリの使用状況や同時実行性(?、キューに積む時点でジョブがどれぐらい並列実行されているか??) を見て、ウェアハウスの適格性を決める。適格性が同じなら、その時点で同時実行ジョブが最も少ない ウェアハウスを選択する。適格なウェアハウスが無い場合、WSSキューに残り続ける。 ジョブスケジュールを行うと、各ウェアハウスのリソース使用状況バランスが変化する。 WSSはクラウドサービスにVM使用状況のレポートを送る。 クラウドサービスは状況次第でDOPを下げる(より少ない並列度で処理するよう計画される)。 DOPを下げた後、ジョブはウェアハウスで実行される。ジョブ終了後リソースは解放される。 負荷に応じてDOPがダイナミックに調整されている様が書かれている。 実際のところ、DOPの推移を観察することはできないのと、DOPの上げ下げとパフォーマンスの 関連が本当にその通りなのか不明なこともあり、結局良くわからない。 並列レベルの制御 MAX_CONCURRENCY_LEVELパラメタにより、最大並列処理数を設定できる。 デフォルト値は8ということなので、最大で4個のジョブを並列実行することになる。 巨大なクエリを処理する場合、1個のジョブを受け持つコア数を増やすことでスループットが上がる 場合があるらしい。並列処理数が下がるとキューに積まれるジョブが増えることに繋がる。 ウェアハウスサイズを増やさずにMAX_CONCURRENCY_LEVELだけ調整しても、 リソースの総量は変わらないはずだし、簡単に最適値が見つかるなら全自動で決めてくれる のだろうから、きっと難しい話なのだろう。QASみたいに全然違う何かを使うと良いよ、と書かれている これは公式の以下のドキュメントが対応する。 同時実行クエリの制限 リソースモニタと使用量制限 クレジットを想定よりも多く消費しないようにするアラートとハードリミットの仕組み。 消費クレジットが制限を超えたことをトリガにアラート、自動停止を実行できる。 リソースモニタが設定されていないウェアハウスを以下のクエリで見つけて設定せよとのこと。 SHOW WAREHOUSES ; SELECT \"name\" AS WAREHOUSE_NAME ,\"size\" AS WAREHOUSE_SIZE FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) WHERE \"resource_monitor\" = \'null\' ; ウェアハウスの負荷とサイズの決定方法 Snowsightでウェアハウスの負荷を確認できる。これの計算方法などが書かれている。 確かに、あれ、何をどうやって集計したチャートなのか知らなかった。 Snowflakeが出力するメトリクスを見てウェアハウスの正しいサイズを決定せよとのこと。 ウェアハウスのジョブ負荷メトリクスは、一定期間内の実行ジョブ数、キューに入ったジョブ数の 平均である、とのこと。実行ジョブ数の平均は、全てのジョブの実行時間(秒)を期間(秒)で 割った値であるとのこと。これはバーの青色の部分だな。 Private Previewで、ウェアハウスの使用率メトリクスが用意されるらしい。 以下の表のように、ウェアハウス単位、クラスタ単位で100分率の値を得られる。 ウェアハウス負荷や使用率によって、キャパシティ割り当てを行うべきとのこと。 どういう数字だったらどうすべきか書かれている。そういえば知らなかった。 ワークロードのスループット・レイテンシが適切で、キューに入ったクエリが少なく、 長期にわたりクエリ負荷が1未満、かつ、使用率が50%を切る場合、 ウェアハウス・クラスタのダウンサイズを検討する。別のウェアハウスを起動し、 キューに入れられたジョブをそのウェアハウスで実行できるようにする。 ワークロードのスループット・レイテンシが期待よりも低速で、かつ、 クエリ負荷が低く、かつ、使用率が75%を超えるなど高い場合、 ウェアハウスのアップサイズを検討するか、クラスタの追加を検討する。 使用量の急増(スパイク)が繰り返し発生する場合、 ウェアハウスの追加・クラスタの増量を行い、スパイクに対応するクエリをそれに移す。 スパイク以外のクエリを小さいウェアハウス・クラスタで実行されるようにする。 ワークロードが通常よりも大幅に高い場合、 どのジョブが負荷に寄与しているのか調査する。 ウェアハウスが定期的に実行される(スパイクではない)が、かなりの期間にわたって 合計ジョブ負荷が1未満である場合、 ウェアハウスのサイズダウン、クラスタの削減を検討する。 ストレージ・キャッシュ-ストレージアーキテクチャ Snowflakeには、テーブルの永続化、JOIN等のクエリ演算子によって生成されクエリの実行中に消費される 中間データの2つの形式のストレージがある。 永続化テーブル 寿命が長い永続化テーブルは、S3等のオブジェクトストレージが使われる。 オブジェクトストレージは比較的スループットが高くないが、長期間保管する際の可用性要件が良い。 S3等のブロックストレージに対して一括上書きすることになるが、immutableなデータを 扱うには適している。ブロックストレージの上でimmutableなデータの水平展開を行う。 (別のMedium記事で、micro-partitionはテーブルのバージョニングであって、immutableな データ領域を重ねていくことと、その仕組みにより副作用的にTime-Travelが用意されることが 書かれている。micro-partitionがブロックストレージ上で増えていく様は面白い) immutableなファイルには列データ、属性データがグルーピング・圧縮され格納されている。 相対位置が付与されていて再構成しやすい。 ブロックストレージに備わっている「部分的な読み取り」機能により、これらのファイルの 必要な部分を取得する。こうして永続化テーブルがブロックストレージに保管・使用される。 JOIN等のクエリ演算子によって生成されクエリの実行中に消費される中間データ 中間データは寿命が短く低レイテンシ・高スループットが求められる。 ジョブの実行にウェアハウスのメインメモリとSSDが使われる。 これらはウェアハウスの開始時に作られ、終了時に破棄される。 これらの一時ストレージは、リモートにある永続化テーブルのライトスルーキャッシュとして機能する。 各仮想ウェアハウスはそれぞれ個別に一時ストレージを持ち、クエリ実行時に使用される。 この一時テーブルは、全ての仮想ウェアハウスから\"個別にコピーすること無しに\"共有できる。 メモリ管理を単純化するためのSpill 中間データの書き込み操作の際に、まずウェアハウス内のメインメモリが使われる。 メインメモリがfullになると、ウェアハウスのローカルdisk(SSD)が使われる。 ローカルdiskがfullになると、リモートストレージが使われる。 メモリ不足、ディスク不足を回避するための仕組みになっている。 事実としては良く知られた挙動だけれども、それと「メモリ管理の単純化」というストーリーが 紐づいて理解しやすくなった気がする。 ストレージ・キャッシュ-キャッシュ戦略 「キャッシュ」とは、良く使うデータを取り出しやすいところに一時的に保存しておくもの。 キャッシュ容量は限られるため、ヒット率を維持しつつ効率的に中身を更新することが重要。 その具体的な仕組みとして、LRU (Least Recently Used)、LFU (Least Frequently Used)が有名。 キャッシュが必要な中間データ(前述)量が小さい場合、一時ストレージレイヤ(=VMのdisk)は、 ファイル名のハッシュ値を使ったLRUキャッシュにより、頻繁にアクセスする永続化データの キャッシュとして使われる。このキャッシュは低優先度で\"lazy\"に行われるらしい。 ファイルが仮想ウェアハウスのどのVMにストアされるかについて「一貫性」が言われている。 一方向関数にファイル名を食わせた結果、ファイル名とストア先VMが決まることを言っている。 サイズ変更によってVMの追加・削除が行われる際にキャッシュがシャッフルされてしまわない。 (VMのサイズが同じならば)永続化ストレージ上のファイルは特定のVMに保存されるため、 永続化ストレージ上のファイルに対する操作は、そのファイルのハッシュが保存されるVMが 実行するようにスケジューリングされる。こうして、ジョブの並列化はファイルのハッシュ値が 一貫して同じVMに保存されることと密接に結びついている。 ファイル名が偏っているとハッシュも偏り、保存先のVMが偏る場合がある。 それを回避するため、ワークロードがそのVMでの所要時間が他のVMでの所要時間よりも 小さいかどうか、に基づいてクラスタ内のVM内でロードバランシングが行われる。(え..?) キャッシュ(execution artifacts)が移動した場合(キャッシュアウトした場合)、 最初に実行がスケジュールされていた既に過負荷になっているVMの負荷がさらに増加する のを避けるため、操作の実行に必要なファイルが永続化ストレージから読み取られる。 仮想化の問題、ネットワークの問題など様々な理由で一部のVMが極端に遅い時があるらしい。 その対策にもなっているらしい。 Snowflakeのスケジューリングロジックは、execution artifactsを永続化ストレージ のキャッシュ先と同じVMに配置することと、全てのexecution artifactsを少数のVMに 配置することの間のバランスを見つけようとする。 前者は永続化ストレージのReadに伴うネットワークトラフィックの最小化を目指すが、 ジョブがウェアハウス内の全てのVMにスケジューリングされることによって中間データが VM間でやり取りされることに起因してネットワークトラフィックが増加するリスクもある。 後者は中間データ交換のためのネットワークトラフィックがなくなる(減る..?)が、 永続化ストレージのReadのためのネットワークトラフィックが増加する可能性がある。 一時データ容量はリモートの永続化ストレージ容量よりもかなり小さい(平均0.1%未満) にも関わらず、Snowflakeのキャッシュスキーム上では、Readのみのクエリで-80%、 Read-Writeがあるクエリで-60%のキャッシュヒット率にもなるらしい。 文章だけでは読みきれないな..。ただキャッシュの仕組みが書かれているだけでなく、 永続化ストレージ上のデータ(=ファイル)をVMに持ってくる仕組みの説明になっていて、 ウェアハウス内のVMで負荷分散して処理していく様が薄ら分かった気がする。 マルチテナント環境におけるセキュリティとリソース分離 アカウント、ジョブごとにデータを分離し、アカウント、ジョブ間でデータが漏洩しないように 設計している。\"仮想マシンを分離すること\"により、各テナントの分離を実現している。 さらに、cgroup、カーネル名前空間、seccomp(※)のようなDockerコンテナに似たカーネルプリミティブ を備えたVM内のサンドボックスにより、同一顧客アカウント内のジョブ間の情報漏洩を防ぐ。 ※cgroup,カーネル名前空間,secompはLinuxカーネルの機能で、 Dockerコンテナの内部で使われている。 cgroup,namespaceは、プロセスグループのリソース(CPU、メモリ、ディスクI/Oなど)の利用を 制限・隔離するLinuxカーネルの機能とのこと。seccompは自プロセスが発行するシステムコールを 制限してプロセスを乗っ取られたとしても被害を最小限にする機能とのこと。 各VMを独自のハードウェア、ページテーブル、カーネルを使用して動作させることで、 マルチテナントセキュリティとリソース分離を図っている。 VMが同じハードウェア、ページテーブル、カーネルを使用した\"VM分離\"がない場合、 従来から使われているカーネルカーネル共有方式(cgroup,名前空間,secomp付き)だけでは、 Snowflakeのセキュリティ基準に達しないと判断したそう。(そうですか..)。 \"VM分離\"するよりもカーネルを共有した方が、コンテナは高速に起動して都合が良いけれども、 カーネルを共有するということは、過去のCVEsから予想されるセキュリティ脆弱性に曝露される ことになる。 仮想ウェアハウスを構成するVMはそのウェアハウスが占有するプライベートなリソースであって、 仮想ウェアハウス間で共有されたりはしない。加えて仮想ウェアハウスはステートレス。 データの状態に影響されず、需要に応じてどんな時でも作成・破棄・リサイズできる。 その仕組みのため、ジョブが特定の仮想ウェアハウスで限定して実行されるから、 その仮想ウェアハウスのパフォーマンスが他の仮想ウェアハウスのパフォーマンスに影響しない。 ジョブ実行の際、各仮想ウェアハウス内のVMが新しいプロセスを起動する。 そのプロセスはジョブの実行期間中にのみ生存する。 プロセスの失敗は自動的に検知され即座に修正(再実行)される。 ユーザは、いつでも複数の仮想ウェアハウスを実行できる。 各ウェアハウス上で、複数のジョブが並列実行する。 ネットワークセキュリティ 仮想ウェアハウスは次の外部ネットワークアクセスを必要とする。 クラウドサービスレイヤとの通信 ジョブ実行時に発生する他の仮想ウェアハウスとのデータ共有 ローカルのクラウドストレージ(diskのspill先)へのアクセス API Gatewayへのアクセス Snowflakeは全ての仮想ウェアハウスからのネットワークトラフィックを信用しない。 内部サービスへのトラフィックは必ず認証済みのエンドポイントを経由する。 外部ネットワークへのトラフィックは外向きプロキシを経由し、アクセス制御ポリシーが適用される。 未認証のエンドポイントへのアクセスはブロックされ、予期しない動きはSnowflakeに報告される。 アカウント間で予期しない漏洩が起こらないように、VM、proxy、ジョブ間でやり取りされる全ての 通信が正常であることを、クラウドサービスレイヤがIPアドレスマッチングを行うことで検証する。 仮想ウェアハウスが持つ署名済みの共有シークレットを使って、仮想ウェアハウス間の全ての通信 について、発信・着信側が本当にSnowflake内部の仮想ウェアハウスであるか検証する。 そもそも仮想ウェアハウスからクラウドサービスレイヤへの通信がむちゃくちゃ多くなり、 DoS攻撃のようにならないように、通信にレートリミットがついていたりするらしい。 他には、フローログを使って何かをしているらしい。フローログって何か知らなかったので調べた。 NWインターフェース間で行き来するIPトラフィックに関する情報をキャプチャする機能。とか。 Wireshakみたいなやつだろうか。例えば、仮想ウェアハウス内のVMが知らないdestに対して 送ったIPトラフィックを見つけてforensic inspectionを行いVMを隔離するなど。 ※デジタルフォレンジック。「証拠保全」みたいな使われ方をしている。 うーん..難しい... ネットワークセキュリティと言うと、つい外部から内部(Ingress)の事かなと思っていたが、 SaaSの内部で好き放題されてしまうリスクがある気持ちを理解した。 外部ネットワークアクセスはこの気持ちの上に成立しているんだろう。 Python/Scala/Javaコードの分離 SQLみたいに出来ることが制限されている言語とは違い、何でもできるJava/Python/Scalaで UDFやプロシージャを書くことはセキュリティ面でリスクがいっぱい。 これらの言語で書いた処理は、パフォーマンスの観点で、ジョブの他の処理と同じVM上で動く。 マルチテナント環境上で(処理を?)分離するために(前述のように再利用できない)VMを使用する のに加え、cgroups, namespaces, secomp, eBPF, chrootのようなLinuxカーネル の要素を使ったセキュアなサンドボックスを提供することで、ジョブに割り当たったスコープの外の 情報にアクセスしたり、処理がSnowflakeの他の機能に影響したりしないようにしている。 (これらは前述されている。それぞれうっすら調べてみた。こういう風に作るんだなぁと面白い) Java/Python/Scalaで書かれた各ジョブには、実行用に新たにサンドボックスが割り当てられる。 コードの実行に最低限必要なread-onlyのソフトウエアが用意される。 サンドボックス用のchrootが用意され(/より上に行けない)、その下には書き込み可能ディレクトリが いくつかあるだけ。ジョブはそこで処理を行う。read-onlyなディレクトリがマウントされて、 JavaのJARパッケージ、Pythonパッケージや、データファイルはそこで共有される。 サンドボックス内のジョブ(のリソースを使用するプロセス)はcgroupが設定され、 使用メモリ、CPU使用量、PID使用量(プロセス数?)が制限される。 マルチプロセッサユースケース(マルチスレッド化してプロセス内で処理を並列化する話?)のため スレッド生成がサポートされる。 さらに、許可リスト(IPC,Inter Process Communicationに関するリソースを隔離する仕組み= IPC Namespace、eBPF,extended Berkley Packet Filter=カーネル内で発生した イベントで駆動する処理を安全・簡単に組み込む仕組みによって、予め許可していないartifacts がサンドボックスの外に接続するUNIXソケットを開けないようにする)によるネットワークアクセスの 制限、process namespaceによるVM上の他のプロセスを見えなくする制限、 seccomp(子プロセスのフォーク、実行可能プログラムの実行)によるカーネルAPIの不必要な 実行の回避が行われる。脅威検知のためptraceがシステムコールを管理する。 ジョブが完了した後、VM上の環境のもろもろの解放、開いたソケットのクローズ、 クレデンシャルの削除、ローカルキャッシュ、一時ファイル、ログの削除が行われる。 追加の多層防御手段?(defense-in-depth measure?)として、規定時間内に終了しなかった Python/Job/Scalaコードを実行するプロセスに対して、監視プロセスがkillシグナルを送る。 サンドボックス外に離脱したり、攻撃者が仮想ウェアハウス上のVMにプロセスを残したり ルートキットを配置する未知のリスクに備えて、Python/Java/Scalaコードを実行したVMは 「実行不可」としてマークされる。仮想ウェアハウスのスケジューリングや起動済みVMをプールする 仕組みの上で、Python/Java/Scalaコードを実行したVMが異なるアカウント・ユーザに 割り当てられると、アカウント間情報漏洩のリスクに繋がってしまうため、異なるアカウント・ユーザに 割り当たらないようになっている。Python/Java/Scalaコードを実行するVMが作られると、 アカウント専用のVMプール入れられる。新しいVMを割り当てるときは、まずはアカウント毎の空き プールからVMが選ばれる。 多数のゼロデイエクスプロイト(脆弱性が発見されてからパッチが当たるまでの期間の攻撃)が 連続して使用されると、サンドボックスが破られてしまうかもしれないが、それに備えた作りに なっている。まずエクスプロイトは、ユーザアカウントで実行中のVMに存在する。このVMは、 Snowflakeサービスや、Snowflake内のローカルネットワーク上のVMから隔離されている。 攻撃者が手にしたクレデンシャルは(サンドボックスを破壊した)特定のアカウントの特定のVMに 限定され他では使用できない。 あくまで論理的な構成が書かれているだけで「コンテナ」というワードも無いし、何かチラチラとするな。 こういうのを「コンテナエスケープ」とか言うらしい。 ソフトウエア更新の管理 Snowflakeの各機能がどうやって仮想ウェアハウスにデプロイされるかについて。 (デプロイの)ワークフローにより新機能、セキュリティアップデート、機能改善が行われる。 全ての処理は自動化されていて手作業の間違いが起きないようにしている。 このリリースプロセスにおいて、単体テスト、回帰テスト、結合テスト、性能、負荷テストが行われる。 リリースプロセスは、本番の前段の環境、または本番に近い環境で行われる。 VMがフリープールに入る前に最新のパッチが当たる。VMのStartやResumeなどの操作の後に、 フリープールからVMに割り当たったり、逆にVMからフリープールに抜けたりするが、 フリープールからVMに割り当たるプロセスの一部として、VMに最新に保つための最新のバイナリが ダウンロードされ、適用される。 Resume、Startなどのライフサイクル操作は即座に終わるように作られているが、 影響を与えないように性能要件が与えられているらしい。 SKU sizeやOSのメジャーパーションなど大きな変更の際には、未適用のVMと適用済みのVMの両方が 同時に動く状態となる。古い方は既存のジョブを実行し、新しい方は、新しいジョブを実行する。 そのようにジョブがルーティングされる。 既存のジョブを実行し終わってから、最終的に古い方は消される。 つまり、1個のウェアハウスについて、アップデートの時期を迎えると背後で(適用前後の)2個になる。 前述のようにキャッシュはVMのローカルディスクなので、もし古いウェアハウスが破棄されたとすると、 キャッシュが失われることになる。 それによりキャッシュミスが発生しパフォーマンスに影響しないように、事前に管理されているとのこと。 がんばってテストしているけれども運用環境にバグが混入することもある。 なのでアップデートをロールバックできるようになっている。 クラウドプロバイダのリージョン毎に、動作中のバイナリの背後で、古いバイナリをコピーしている。 古い方は非アクティブのままとしている。(トラフィックが発生しない?) 大規模障害に備えて、通常、新しいジョブを新しいバージョンのウェアハウスにルーティングしている ものを古いウェアハウスにルーティングするロールバックをできるようにしている。 Issueに基づいて顧客ごとに対象を絞ったロールバックをすることもあるらしい。 顧客のワークロードはそれぞれ大分ことなるので、全員が同じ頻度でバグを踏むことはないので。 特定の顧客に対して、アップデートした一部のリリースをロールバックする、みたいなことをするらしい。 リリースノートの扱いが良い感じになっていて、こういう感じで運用されているのだな、と。 将来の機能 現在、ユーザは、ワークロードの複雑さ、処理時間、コストを考慮して適切な調整を行わないといけない。 例えば、サイズ、ウェアハウスタイプ、クラスタ数、スケーリングポリシーなど。 こういったキャパシティ調整の大変さを減らしたり無くそうとしているらしい。 microVM(例えばFirecrackerやKata Containersなど)やシステムコールのオフロードに 投資し、より強力なサンドボックス分離メカニズムを実現しようとしているらしい。 それにより、Python/Javaコードで現状ではできないことが出来るようにしたいらしい。 まとめ Deep dive into the internals of Snowflake Virtual Warehousesを読んでみました。 たぶん公開されていない内部の仕組みの割合が多いのかなと思いましたがどうでしょうか。 正直かなり難しくて、途中、ほとんど写経状態になっている部分もありますが、 なるべく分からないところを調べながら、何を言いたいのかを趣旨の理解に努めました。 正直、知らなくても問題ないし、公開されていない以上、実際は違うかもしれないし、 将来変更されてしまうかもしれません。 1週間ぐらいかけて読んでみて、公開されている仕様を説明しやすくなった気はしました。

default eye-catch image.

GCSとのストレージ統合を設定した話

SnowflakeはS3, Blob, GCSを外部ステージとして設定し、データをロードする機能を備えています。 Snowflakeは各クラウドストレージとの接続方法として「ストレージ統合」の使用を推奨しています。 今回、GCSとのストレージ統合を設定し、外部ステージのロード先としてみました。 その手順等を感想を混ぜつつ書いてみようと思います。 参考にしたSnowflake公式ドキュメントは以下です。 ストレージ統合の設定方法がステップbyステップで示されています。 Google Cloud Storageの統合の構成 この記事は、公式が説明するステップに従って書いていきます。 [arst_toc tag=\"h4\"] GCSとストレージ統合 一般に、各ストレージサービスと連携する際、接続に必要な接続情報が必要です。 接続情報を自力で保管し使用する場合、セキュリティリスクに晒されるため、 Snowflakeはよりセキュアな方法として「ストレージ統合」機能を提供しています。 現在、Snowflakeは「ストレージ統合」機能のみを推奨しています。 各クラウドプロバイダ毎に実装方法は異なり、GCSの場合、Service Accountを使用します。 クラウドプロバイダ間で概念が異なるため 「AWSでいうところのXXXだよね」 は誤解を生みますが、 簡単に言えば、AWSのIAM、Azureのサービスプリンシパル、が相当すると思います。(本当??) 以下の図はその概念図です。(公式直リンク) Snowflakeは、ストレージ統合オブジェクトをGCS用のサービスアカウントと紐付けます。 このサービスアカウントは裏でSnowflakeが作成します。 言い換えると、GCSの認証責任をSnowflakeが作成するサービスアカウントに委任します。 Snowflake上のロード・アンロード操作の裏で透過的にサービスアカウントの権限参照が行われます。 Snowflakeでストレージ統合を作成する Snowflake上でストレージ統合を作る方法は以下の通りです。 実行にはACCOUNTADMINロールか、CREATE INTEGRATION権限が必要です。 ストレージ統合を作成する段階では、GCP側の権限設定は不要です。 CREATE STORAGE INTEGRATION gcs_int TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = \'GCS\' ENABLED = TRUE STORAGE_ALLOWED_LOCATIONS = (\'gcs://mybucket1/path1/\', \'gcs://mybucket2/path2/\') STORAGE_BLOCKED_LOCATIONS = (\'gcs://mybucket1/path1/sensitivedata/\', \'gcs://mybucket2/path2/sensitivedata/\'); 自動生成されたサービスアカウントの情報を取得 CREATE INTEGRATIONによって作られたSnowflake用のサービスアカウント情報を取得します。 オブジェクトの詳細情報を取得する DESC コマンドにより取得できます。 DESC STORAGE INTEGRATION gcs_int; +-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+ | property | property_type | property_value | property_default | +-----------------------------+---------------+-----------------------------------------------------------------------------+------------------| | ENABLED | Boolean | true | false | | STORAGE_ALLOWED_LOCATIONS | List | gcs://mybucket1/path1/,gcs://mybucket2/path2/ | [] | | STORAGE_BLOCKED_LOCATIONS | List | gcs://mybucket1/path1/sensitivedata/,gcs://mybucket2/path2/sensitivedata/ | [] | | STORAGE_GCP_SERVICE_ACCOUNT | String | service-account-id@project1-123456.iam.gserviceaccount.com | | +-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+ STORAGE_GCP_SERVICE_ACCOUNTプロパティが、作成されたサービスアカウントです。 バケットオブジェクトにアクセスするためのサービスアカウント権限を付与する 最後に、サービスアカウントがバケットへアクセスするための権限設定を行います。 ここで初めて、GCPコンソール上での操作が必要となります。 カスタムIAMロールの作成 以下の権限を持つIAMロールを作成します。今回はロードのみを行うため以下を設定します。 パージを行う場合はobjects.delte、アンロードを行う場合はobjects.createが必要です。 storage.buckets.get storage.objects.delete storage.objects.get storage.objects.list サービスアカウントへIAMロールを割り当てる GCPのやり方に従って、サービスアカウントへIAMロールを割り当てます。 バケットに対して、サービスアカウントのアクセスを許可し、その許可内容をIAMロールで指定します。 Azureと比較するとAWSとGCPは近いなと感じます。 外部ステージの作成 まず、ステージオブジェクトを格納するSnowflakeデータベースへの権限付与を行います。 GRANT USAGE ON DATABASE mydb TO ROLE myrole; GRANT USAGE ON SCHEMA mydb.stages TO ROLE myrole; GRANT CREATE STAGE ON SCHEMA mydb.stages TO ROLE myrole; GRANT USAGE ON INTEGRATION gcs_int TO ROLE myrole; 次にストレージ統合を指定して外部ステージオブジェクトを作成します。 USE SCHEMA mydb.stages; CREATE STAGE my_gcs_stage URL = \'gcs://mybucket1\' STORAGE_INTEGRATION = gcs_int FILE_FORMAT = my_csv_format; 最後に疎通試験を行います。 copy into testtable from @my_gcs_stage pattern=\'testdata.csv\'; まとめ Snowflakeの公式ドキュメントの通りにGCSとのストレージ統合を作成しました。 また、作成したストレージ統合上に外部ステージを設定し、ロードが出来ることを確認しました。

default eye-catch image.

デプロイメントについて調べてみた話(端折り気味)

dbt Fundamentalsの「Deployment」のセクションについて理解した内容を言葉にしてみた。 自動化を解決するのにdbt Cloudを使用した記事となっている。この記事は前回の続き。 [clink url=\"https://ikuty.com/2023/08/23/dbt_docs/\"] いったんdbtそのものの理解に留めたいので、かなり端折気味。 [arst_toc tag=\"h4\"] デプロイメントとは 以下、動画の意訳。 開発環境であればIDE上でコマンド(dbt run,dbt test)をAd-Hocに必要に応じて実行してきた。 本番環境へのデプロイメントは異なる。本番環境用に分離されたブランチ(main,master?)が存在し、 そして分離されたスキーマが存在する。(開発環境と異なり)意思決定に必要なデータが存在する。 また、本番環境では(Ad-Hocではなく)dbtコマンドをスケジュール実行する。 ビジネス要件に応じてdbt run, dbt testを実行することになる。 本番環境は、分離されたブランチ・スキーマ上においてSSOT(Single Source Of Truth)を実現する。 そして、これまで作業してきた開発環境を分離し、様々な変更を本番環境への影響なしに実行する。 Snowflakeの本番検証環境分離の戦略 これ、動画にないのだけれど、そもそもSnowflake側を検証環境と本番環境に分離しないといけない。 以前、以下の記事にマルチテナントに関係するホワイトペーパーを読んで内容を整理してみたが、 そこで、環境を分離するにあたって必要な考慮事項がまとめられている。 [clink url=\"https://ikuty.com/2023/03/29/snow-multitenant-design/\"] 概ね、テナント=環境と捉えて問題ないという感想を持っている。 OPT(Object Per Tenant)、つまり、同一アカウント内に検証/本番環境を共存させるスタイルと、 APT(Account Per Tenant)、つまり、アカウント毎に検証/本番環境を分離するスタイル、 両方あると考えている。 Snowflakeにおいて、アカウントを跨いでオブジェクトを共有できないことがポイントで、 それを不自由な制限事項と取るか、確実な安全の確保と取るか、どちらかの選択となる。 OPTであれAPTであれ、オブジェクト数の増加に伴いTerraform等のツールが事実上必須となる ことから、実はAPTでも事実上の共有をしているのと同じとなり、APTの方が良いのかなと思っている。 URLが分離するので運用の調整がしやすいし、第一に安全性にグレーな部分が無くなる。 dbt Cloud上でのデプロイメントの準備 dbt Coreの各コマンドをスケジュール実行することで機能を実現する、という特徴から、 そもそも検証/本番環境を実現するためにはdbt Coreだけでは機能が足りない。 dbt Cloudには、検証/本番環境の実現に必要な機能が乗っている。 なので、このDeploymentセクションはdbt Cloudの機能説明に近い。 ちょっと記事の趣旨から外れてしまうのでサラッと眺めるに留める。 dbt Cloud上のDeploymentメニュー配下には以下のメニューがある。 - Run History - Jobs - Environments - Data Sources Environmentsから、Deployment単位に対する操作が行える。 Create Environmentの操作により、新たなEnvironmentを作成できる。 - General Settings - Name - Type - dbt Version - [Check] Only run on cuustom branch データプラットフォーム(DW)との接続に必要な設定を行う。 DW側がアカウント分離方式で環境分離しているのであれば、ここで本番検証の差異が発生する。 - Deployment Connection - (Overwrite connection settings for this environment, only certain fields are able to be overwrite ) - Account - Role (Optional) - Database - Warehouse 接続に必要なCredentialsの設定を行う。環境分離されている粒度で接続を行う。 - Deployment Credentials - (Enter your deployment credentials here, dbt will use these credentials to connect to your database and run scheduled jobs in this environment ) - Auth method - Username - Password - Schema ジョブ実行時の設定を行う。タイムアウト設定や、都度ドキュメントを作るか、都度source freshnesを判定するか、 また、そもそも何のコマンドを実行するか。 - Execution Settings - Run Timeout - (Number of seconds a run will execute before it is canceled by dbt Cloud. Set to 0 riever time out for this job) - Defer to a previous run state ? - Generate docs on run - (Automatically generate updated project docs each time this job runs) - Run source freshness - (Enables dbt source freshness as the first step on this job, without breaking subsequent jobs) - Commands - (This is where you can pass whatever dbt commands you would like. So this could be dbt run, dbt tests, dbt seed, whatever it might be) 省略... - Helpful Resouruces - Enabling CI - Souurce Freshness dbtがどういったトリガでジョブを起動するかを設定する。 結構いろいろなことができる。スケジュール実行だけでなくWebhook、APIなんかも設定できる。 細かいところは省略。 - Triggers - Configure when and how dbt should trigger this job. - Schedulue - [Check] Run on Schedule - [Radio] Schedule Days - Subday,Monday,Tuesday,Wednesday,Thursday,Friday,Saturday - [Radio] Enter custom schedule - Timing - Every [??] hours (Starting at midnight UTC) - At exact intervals [??] UTC (e.g.\"0,12,23\" for midnight,noon,and 11PM UTC) - Webhooks - [Check] Run on Pull Requests ? - (省略) - API (省略) デプロイメント まとめ Historiesから、Jobの実行の実行履歴を観察できる。 以下、上からGit RepositoryからCloneした後、Snowflakeとの接続諸々を実行、 dbt deps (dbt Fundamentalsで出てきてないが、モジュール化されたパッケージのインストール)を実行。 まとめ dbt Fundamentalsの動画を聴いて、dbtのデプロイメント機能を追ってみた。 dbtそのものを追いたいので大分端折ってしまった。あまり記事に意味がないかもしれない。 自分の経験が追いついてきたらもう一度トライしてみたい。