default eye-catch image.

External Network Accessを使ってSnowflakeとFitbitAPIを繋いでみた話

FitbitはAPIがしっかり整備されていて、OAuth2 endpoint経由でデータが取り放題。 せっかくなので、話題のExternal Network Access(2023年12月現在 PuPr)を試してみようと思う。 つまり、FitbitAPI→Snowflakeをやってみようと思う。 Fitbit APIを使用するにはOAuth2.0 Authorizationを通す必要がある。 Snowflakeの公式にOAuth2.0 Endpoint経由でGoogle翻訳APIと連携する段取りが書かれていて、 それをそのままFitbit APIのものに差し替えるだけで動いた。 外部ネットワークアクセスの例 外部ネットワークアクセスについては以下。 [clink implicit=\"false\" url=\"https://docs.snowflake.com/ja/developer-guide/external-network-access/creating-using-external-network-access#label-creating-using-external-access-integration-network-rule\" imgurl=\"https://www.snowflake.com/wp-content/uploads/2017/01/snowflake-logo-color-300x69.png\" title=\"外部アクセス統合の作成と使用\" excerpt=\"特定の外部ネットワークロケーションへのアクセスを有効にするには、外部ロケーションのリストと使用を許可されるシークレットのリストを指定する外部アクセス統合を作成します。UDF の作成時、あるいは CREATEFUNCTION または CREATEPROCEDURE でプロシージャを作成する際に、 EXTERNAL_ACCESS_INTEGRATIONS 句を使用してこの統合を参照することで、ハンドラーコードが外部ロケーションとの認証コードにシークレットを使用できるようになります。\"] 2016年6月に書いた記事。phpで検証をしていた。 この辺りからバッテリーがダメになる度に新しいFitbit Charge(1,2,3)を買って溜めてきた。 この間、FitbitがGoogleに買われてしまったり、スマホアプリが大幅に変わったり、色々あった。 基本的な機能はずっと動いているので、7年分のデータが溜まっているんじゃないかな、と期待。 [clink url=\"https://ikuty.com/2016/06/07/fitbitapi-authenticate-grant-flow/\"] Fitbit API側の準備 OAuth2連携に必要な情報を dev.fitbit.com から取得する必要がある。 Authorization Code Grant Flow with PKCE こちらを参考にさせていただいた。 [clink implicit=\"false\" url=\"https://www.zenryoku-kun.com/post/fitbit-api#register-app\" imgurl=\"https://www.zenryoku-kun.com/home/sakura-400w.jpg\" title=\"FitbitのWeb APIを実行する方法\" excerpt=\"Fitbit Sense2を購入しました。はじめてのスマートウォッチです。Fitbitデバイスでは、心拍数や歩数等、収集したデータをWeb APIで取得することが可能です。さっそく使って遊んでみようと思ったら、Web APIの認証がなかなか通らない、、、ドキュメントはとても充実しているのですが、OAuth2.0の認証パターンがImplicit Grant Flowの場合、Authorization Code Grant Flowの場合、PKCEを使う場合、、、などなど、情報量がとにかく多く混乱してしまいました。何はともあれ、何とか認証を通して、こんな感じで歩数などのアクティビティ情報や、心拍数や血中酸素濃度(SpO2)を取得することが出来ました。\"] 以下を準備すればOK。 access-token refresh-token client-id Snowflakeでリソース作り Snowsightでポチポチとリソースを作っていく。 USE ROLE SYSADMIN; -- 外部ロケーションを表すネットワークルールの作成 -- CREATE OR REPLACE NETWORK RULE fitbit_apis_network_rule MODE = EGRESS TYPE = HOST_PORT VALUE_LIST = (\'api.fitbit.com\'); -- 外部ロケーションとの認証に必要なOAuth認証情報を保持するセキュリティ統合の作成 -- CREATE OR REPLACE SECURITY INTEGRATION fitbit_api_oauth TYPE = API_AUTHENTICATION AUTH_TYPE = OAUTH2 OAUTH_CLIENT_ID = \'\' OAUTH_CLIENT_SECRET = \'\' OAUTH_TOKEN_ENDPOINT = \'https://api.fitbit.com/oauth2/token\' OAUTH_AUTHORIZATION_ENDPOINT = \'https://www.fitbit.com/oauth2/authorize\' ENABLED = TRUE; -- セキュリティ統合に含まれる認証情報を表すシークレットの作成 -- CREATE OR REPLACE SECRET fitbit_api_oauth_token TYPE = oauth2 API_AUTHENTICATION = fitbit_api_oauth OAUTH_REFRESH_TOKEN = \'\'; 最後に外部アクセス統合を作成する。 ストレージ統合や、Notification統合など、統合の作成にはACCOUNTADMINが必要で、 同様に外部アクセス統合の作成にはACCOUNTADMINが必要とのこと。 USE ROLE ACCOUNTADMIN; CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION fitbit_apis_access_integration ALLOWED_NETWORK_RULES = (fitbit_apis_network_rule) ALLOWED_AUTHENTICATION_SECRETS = (fitbit_api_oauth_token) ENABLED = TRUE; 外部ロケーション(ネットワーク)にアクセスするUDFsを書くロールを作成する。 UDFsを書く際に、シークレットを参照する必要がある。 UDFsを書けるロールにシークレットのREAD権限を付与しておく必要がある。 以下、そのままでは SECURITYADMINがDB・スキーマに触れないので環境により修正が必要。 USE ROLE USERADMIN; CREATE OR REPLACE ROLE ikuty_fitbitapi_developer; USE ROLE SECURITYADMIN; USE SCHEMA IKUTY_DB.PUBLIC; GRANT READ ON SECRET IKUTY_DB.PUBLIC.fitbit_api_oauth_token TO ROLE ikuty_fitbitapi_developer; GRANT USAGE ON INTEGRATION fitbit_apis_access_integration TO ROLE ikuty_fitbitapi_developer; GRANT ROLE ikuty_fitbitapi_developer TO role SYSADMIN; 本体の実装 PythonでOAuth2 Endpoint経由でFitbit APIにGETリクエストを投げるFunctionを書く。 最初、トークンのexpire時のrefreshを自力で書いていたが、get_oauth_access_token(\'cred\')により、 自動的にrefreshしてくれていることに気づいた。 use role sysadmin; use schema IKUTY_DB.PUBLIC; CREATE OR REPLACE FUNCTION fitbit_python() RETURNS STRING LANGUAGE PYTHON RUNTIME_VERSION = 3.8 HANDLER = \'hello_fitbit\' EXTERNAL_ACCESS_INTEGRATIONS = (fitbit_apis_access_integration) PACKAGES = (\'snowflake-snowpark-python\',\'requests\') SECRETS = (\'cred\' = fitbit_api_oauth_token ) AS $$ import _snowflake import requests import json def hello_fitbit(): with requests.Session() as s: access_token = _snowflake.get_oauth_access_token(\'cred\') url = \"https://api.fitbit.com/1/user/-/activities/steps/date/today/1m.json\" res = s.get(url,headers={\"Authorization\": \"Bearer \" + access_token}) res_data = res.json() return res_data $$; 実行結果は以下。1日毎の歩数を1ヶ月分取得できた(恥...)。 select parse_json(fitbit_python()); { \"activities-steps\": [ { \"dateTime\": \"2023-11-23\", \"value\": \"15570\" }, { \"dateTime\": \"2023-11-24\", \"value\": \"5392\" }, { \"dateTime\": \"2023-11-25\", \"value\": \"8993\" }, { \"dateTime\": \"2023-11-26\", \"value\": \"10525\" }, { \"dateTime\": \"2023-11-27\", \"value\": \"6371\" }, { \"dateTime\": \"2023-11-28\", \"value\": \"2713\" }, { \"dateTime\": \"2023-11-29\", \"value\": \"9252\" }, { \"dateTime\": \"2023-11-30\", \"value\": \"0\" }, { \"dateTime\": \"2023-12-01\", \"value\": \"7947\" }, { \"dateTime\": \"2023-12-02\", \"value\": \"11265\" }, { \"dateTime\": \"2023-12-03\", \"value\": \"8557\" }, { \"dateTime\": \"2023-12-04\", \"value\": \"2366\" }, { \"dateTime\": \"2023-12-05\", \"value\": \"7985\" }, { \"dateTime\": \"2023-12-06\", \"value\": \"8109\" }, { \"dateTime\": \"2023-12-07\", \"value\": \"6852\" }, { \"dateTime\": \"2023-12-08\", \"value\": \"3707\" }, { \"dateTime\": \"2023-12-09\", \"value\": \"12640\" }, { \"dateTime\": \"2023-12-10\", \"value\": \"7122\" }, { \"dateTime\": \"2023-12-11\", \"value\": \"7190\" }, { \"dateTime\": \"2023-12-12\", \"value\": \"8034\" }, { \"dateTime\": \"2023-12-13\", \"value\": \"5228\" }, { \"dateTime\": \"2023-12-14\", \"value\": \"2861\" }, { \"dateTime\": \"2023-12-15\", \"value\": \"6785\" }, { \"dateTime\": \"2023-12-16\", \"value\": \"11720\" }, { \"dateTime\": \"2023-12-17\", \"value\": \"11021\" }, { \"dateTime\": \"2023-12-18\", \"value\": \"0\" }, { \"dateTime\": \"2023-12-19\", \"value\": \"11021\" }, { \"dateTime\": \"2023-12-20\", \"value\": \"0\" }, { \"dateTime\": \"2023-12-21\", \"value\": \"2703\" }, { \"dateTime\": \"2023-12-22\", \"value\": \"3336\" }, { \"dateTime\": \"2023-12-23\", \"value\": \"7497\" } ] } 結論 PuPrのExternal Network Accessを使用して、FitbitAPI→Snowflakeが出来ることを確認した。 (途中、自動的にトークンをrefreshしてくれている、と書いたが、何度かExpireさせないと良くわからない。) 相手がOAuth2.0ならとても簡単に繋ぐことができると思う。 次は、せっかくなのでSiS(Streamlit in Snowflake)で可視化してみたりしたい。

default eye-catch image.

Deep dive into the internals of Snowflake Virtual Warehousesを読んでみた

この記事はSnowflake Advent Calendar 2023シリーズ2の19日目です。 今年はSnowProAdvanced: Architect試験に合格できました。 結局のところ資格試験であるという側面はあるものの、いろいろ役立っている実感があります。 その後、Mediumというメディアで気になる記事を読み漁る、みたいなことを始めました。 正直知らないことばかりです..。 いくつか読んだ記事のうち、これはヤバいなと感じた記事の読書感想文を書こうと思います。 [clink implicit=\"false\" url=\"https://medium.com/snowflake/deep-dive-into-the-internals-of-snowflake-virtual-warehouses-d6d9676127d2\" imgurl=\"https://miro.medium.com/v2/resize:fit:1002/format:webp/0*6KqDj8Y_HxeL11xT.png\" title=\"Deep dive into the internals of Snowflake Virtual Warehouses\" excerpt=\"Snowflake’s Data Cloud provided as Software-as-a-Service (SaaS), enables data storage, processing, analytic solutions, Machine Learning, and running apps & services in a performant, easy-to-use, and flexible manner using “virtual warehouses” which is the primary compute primitive in Snowflake. This post details the internals of virtual warehouses which provide elastic, highly available, and fully managed mechanisms to run a variety of customer workloads on multiple cloud service providers.\"] 訳は間違っているところもあると思います。ご容赦ください。 [arst_toc tag=\"h4\"] 仮想ウェアハウスの基本 まず、コンピュートとストレージが分離し、それぞれ独立してスケールできることが特徴としている。 Snowflakeにおいて、仮想ウェアハウスはコンピュートの最小単位ではあるが、仮想ウェアハウスは 複数のVMからなるMPPクラスタであると言及している。 この記事は、仮想ウェアハウスを説明するために仮想ウェアハウスを構成するVMに言及している。 仮想ウェアハウスの下に物理のVMがいることにフォーカスがあてられている。 SnowflakeのSaaSサービスを実現するコードはMPPクラスタを構成する各VMで動いていて、 ジョブ実行の際、各VMはリソースを直接参照するしVM同士でmeshN/Wを構成して資源を共有する。 (後述) 仮想ウェアハウス同士はストレージを共有しないけれど、仮想ウェアハウス内部のVMは むちゃくちゃ密に連携しあって、計算資源もストレージも共有しあう。 このセクションで、仮想ウェアハウスの設計方針が述べられている。 「可能な限り顧客に選択肢を提供するのを避けSnowflakeがベストを考える」が基本方針である一方、 「仮想ウェアハウスを構成するVMの物理資源を変更できる柔軟性を提供する」と言っている。 以降、仮想ウェアハウスを構成するVMの振る舞いについて書かれている 仮想ウェアハウスのサイズとタイプ 仮想ウェアハウスのタイプはCPUとメモリの比率、サイズはCPUとメモリの総量を決める。 タイプは、StandardとSnorpark-optimizedの2種類。 Snowpark-optimizedは、Standardの16倍のメモリ量と10倍のSSDを持つ。 メモリ増量により計算が高速化する。ストレージが大きいとキャッシュや中間生成物が 後続の実行で再利用され高速化する。 中間生成物の書き込みに対し、第1に仮想ウェアハウス上のVMのメモリが使われる。 メモリを使い切ったとき、VMのローカルSSDが使われる。 SSDも使い切ったとき、S3等のリモートストレージが使われる。 QUERY_HISTORY viewにSSD、リモートストレージにスピルした量を出力するので、 メモリが溢れないようにするか、少なくともSSDには乗るようにサイズを増やせよ、と言っている。 (やはりストーリーがストレートでわかりやすい..) SELECT QUERY_ID ,USER_NAME ,WAREHOUSE_NAME ,WAREHOUSE_SIZE ,BYTES_SCANNED ,BYTES_SPILLED_TO_REMOTE_STORAGE ,BYTES_SPILLED_TO_REMOTE_STORAGE / BYTES_SCANNED AS SPILLING_READ_RATIO FROM \"SNOWFLAKE\".\"ACCOUNT_USAGE\".\"QUERY_HISTORY\" WHERE BYTES_SPILLED_TO_REMOTE_STORAGE > BYTES_SCANNED * 5 - Each byte read was spilled 5x on average ORDER BY SPILLING_READ_RATIO DESC ; マルチクラスタ ウェアハウス マルチクラスタは、ジョブの同時実行性を高めるためにクラスタを静的/動的に追加する仕組み。 クラスタ内のVMは相互に関係し合いリソース共有して複数台でジョブのオフロードを行うため、 単一クエリのパフォーマンスアップに寄与する。一方で、クラスタ間はリソース共有しないため、 増えたクラスタ内のVMはジョブのオフロード先の融通にはならず、同時実行時の性能劣化予防に働く。 他にスケーリングポリシーの話や、Min/Max設定による静的/動的追加の話が書かれているが省略。 UpではなくOutの方が費用対効果が高い例として、interleaved workloadsが挙げられている。 Outで増やしたクラスタがダラダラと回り続けるケースが除外できず理論値ではあるけれども、 Upに対するOutのメリットを言う場合に説明しやすい図だなと思った。 この辺りモヤモヤしていたのでバシっと説明してもらえて助かりました。 柔軟性-ステートレスなスケーリング 需給調整の文脈ではなく、自動起動と自動サスペンドの文脈で仮想ウェアハウスの状態が書かれている。 リソースがステートレスであれば、需要の増減と関係なくリソースを増減できる。 仮想ウェアハウスはステートレスリソースであって、需要の発生によりプロビジョンングされ、 需要の消滅により仮想ウェアハウスに紐づくリソースが破棄される。 仮想ウェアハウスにジョブが送信されると、クラスタ内のVMはジョブ実行中にのみ存続するプロセスを 生成する。プロセスが失敗した場合、自動的に再試行される。 ユーザとウェアハウスは多対多の関係であり、ウェアハウスから見ると同時に複数の需要が発生する。 異なる組織・部署がウェアハウスを使用するケースにおいて、ウェアハウスは同時にそれぞれを処理する。 各々のウェアハウスは同じ共有テーブルにアクセスできるが、その際、データのコピーをウェアハウス内に 持たなくても良いように作られているので、各組織・部署の処理が他の組織・部署に暴露されるリスクを 回避できるようになっている。 異なる組織・部署が実行したジョブがウェアハウス上で相互作用しない、という事実があり、 組織・部署から見れば、他の組織・部署に全く影響されず自由にウェアハウスを利用できるという 書き方になっていて、ちょっと抽象度が高いですが「ステートレス」が説明されていました。 柔軟性-マルチクラスタ オートスケーリング スケーリングポリシーの説明。 スケーリングポリシーの設定により、各クラスタの自動起動・シャットダウンの相対的な速度を制御する。 スタンダードポリシーはクレジット消費削減よりもクラスタ追加を優先し、クエリ所要時間を最小化する。 エコノミーポリシーの設定により、クラスタを追加するよりも現在実行中のクラスタを全開で回すことが 優先され、結果としてクエリがキューに入りやすくなり所要時間が延びるが、クレジット消費は減る。 この説明は公式通り。 柔軟性-ゼロへのスケール Auto-resumeとAuto-suspendの説明。 ウェアハウスに対する需要がなくなって一定期間経ったら自動的に停止する。 ウェアハウスに対する需要が発生したら自動的に再開する。その時間等を調整できる。 これらの設定はクラスタではなくウェアハウスに対して設定する。 これも説明は公式通り。需要がなくなったら1個も起動していない状態にできることが主張ポイント 柔軟性-自動Suspend期間の管理 Suspendは、つまり仮想ウェアハウスを構成するVMのリリースなので、VMが持つSSDに 蓄えられたキャッシュは同時に破棄されてしまう。これは、後続のジョブが発生したときに クエリ結果キャッシュが効かなくなることに繋がる。 公式の通り、「ウェアハウス稼働時間(クレジット消費)」と「クエリパフォーマンス」がトレードオフの 関係となる。需要がなくなってすぐにウェアハウスを止めると確かにクレジット消費は減るが、 キャッシュヒット率が下がる。トレードオフにSweet spotがあるので探しましょうと書かれている。 これに留まらず、どういう風に決めたら良いかガイドが書かれている。 ただ、これは答えが無い問題で、実験してねとも書いてある。 - タスク実行、ロード、ETL/ELTユースケースにおいて、すぐに止めた方が良い。 - BI等SELECTが起きるユースケースは、止めるまで10分待つべき。 - DevOps,DataOps,Data Scienceのユースケースは、停止時間は5分が最適。 とりあえず、タスク実行、ロードでは、自動Suspend期間を持たせる意味はないので、 そこは、バッサリ最速で落とす勇気が出る書き方で参考になりました。 全てのクエリのうち、SSDからスキャンした割合を集計するクエリは以下。 この割合が低いということは、ウェアハウスのSuspendが早すぎることを示している。 SELECT WAREHOUSE_NAME ,COUNT(*) AS QUERY_COUNT ,SUM(BYTES_SCANNED) AS BYTES_SCANNED ,SUM(BYTES_SCANNED*PERCENTAGE_SCANNED_FROM_CACHE) AS BYTES_SCANNED_FROM_CACHE ,SUM(BYTES_SCANNED*PERCENTAGE_SCANNED_FROM_CACHE) / SUM(BYTES_SCANNED) AS PERCENT_SCANNED_FROM_CACHE FROM \"SNOWFLAKE\".\"ACCOUNT_USAGE\".\"QUERY_HISTORY\" WHERE START_TIME >= dateadd(month,-1,current_timestamp()) AND BYTES_SCANNED > 0 GROUP BY 1 ORDER BY 5 ; 柔軟性-ウェアハウス内のVMは起動済みVMのプールから割り当てられる VMをコールドから起動するには10秒オーダーの時間がかかる。そもそも小規模のクラウドサービスでは VMの数が不足して流動性がない場合もあり、起動済みのVMをプールして再利用することで、 これらの問題を解決しようとしている。 Snowflakeは、VMの起動、終了、停止、再開、スケーリング等のオペレーション時間に対して、 内部でサービスレベル目標を設けている。 (これらの時間がサービスレベル目標から外れるとSnowflake内部でインシデント管理されるらしい。) ユーザのリクエストで需要が発生した場合、起動済みVMのプールからVMが選ばれ、 ウェアハウスに割り当てられる。 起動済みVMのプールのサイズは、過去の需要のベースラインとスパイクから予測されているらしい。 確かにウェアハウスが瞬時に起動する仕組みが気にはなっていました。 妥当な仕組みで成立しているようですが、言及されている点がポイントかと思います。 柔軟性-需要のバーストに対して用意されるQAS サイズアップの他にQAS(Query Acceleration Service)というサービスが存在する。 起動済みVMプールにあるVMを需給に応じて自動的にウェアハウスに組み入れる。 ウェアハウス内でVMは密に連携してクエリをオフロードし合う。 動的なサイズアップであって、疎連携のマルチクラスタとは異なる。 QASは主に、巨大なテーブルのScanや、burstyなワークロードを目的とする。 QASを使用すると、大規模なクエリが検知された場合にウェアハウス内のVMが ウェアハウスから離れ、他のユーザの小規模なクエリに使われるらしい。 通常はウェアハウスのサイズアップよりも低いコストで目的を達成できるそう。 この手の機能が何故ワークロードを高速化するのか、結局のところ中身を知らないとわからないと 思うので、機能の説明の他に、どういう作りなのかを書いてくれるととても参考になる気がする。 When to useはburstyなワークロードということ。 QASで恩恵を受けられるクエリがどれぐらいあるか気になるところ。 公式によると以下の特徴を持つクエリはQASの恩恵を得られないそう。 フィルターや集計(つまり、 GROUP BY)がない。Query Acceleration Serviceは現在、このようなクエリを高速化できません。 フィルターの選択性が十分ではない。または、 GROUP BY 式のカーディナリティが高くなっている。 十分なパーティションがない。スキャンするために十分なパーティションがないと、クエリアクセラレーションの利点は、サービス用に追加のサーバーを取得する際の待機時間によって相殺されます。 クエリに LIMIT 句が含まれている。ただし、 ORDER BY 句を含んでいる LIMIT 句はサポート されます。 QASの恩恵を得られるクエリとウェアハウスは以下のビューから探すことができる。 -- アクセラレーションの対象となるクエリ実行時間の量によって、 -- サービスから最もメリットを受ける可能性のあるクエリを識別します。 SELECT query_id, eligible_query_acceleration_time FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_ACCELERATION_ELIGIBLE ORDER BY eligible_query_acceleration_time DESC; -- Query Acceleration Serviceの特定の期間中、 -- 対象となるクエリが最も多いウェアハウスを識別します。 SELECT query_id, eligible_query_acceleration_time FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_ACCELERATION_ELIGIBLE WHERE warehouse_name = \'mywh\' ORDER BY eligible_query_acceleration_time DESC; QASにより、ウェアハウスは需給調整のためにVMをリース(借りる)する、という表現がある。 ウェアハウスがリースできるVMの数の最大値は、Scale Factorという数値で表される。 要は、通常のウェアハウスサイズで確保するVMの数の何倍のVMをリースできるか。 例えば、Scale Factorが5、VMのサイズがM(つまり4credsits/hour)の場合、 4*5=20 credits/hourまで増強することになる。 Scale FactorはQUERY_ACCELERATION_ELIGIBLEビューにあり、 クエリID単位で知ることができる。 SELECT MAX(upper_limit_scale_factor) FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_ACCELERATION_ELIGIBLE WHERE warehouse_name = \'mywh\'; 仮想ウェアハウスのジョブスケジューリング スループット最大化、レイテンシ最小化、クラスタ使用率最大化、異なる需要に対して供給のために、 ウェアハウスの負荷を追跡・調整するウェアハウススケジューリングサービス(WSS)が備わっていて、 クエリがクラウドサービスレイヤでコンパイルされた後、WSSがジョブスケジューリングを行う。 WSSは各VMのCPU・メモリ使用量を追跡する。ウェアハウスのメモリキャパシティは、 各VMの実効メモリ(OSやソフトウエアの使用を除く)にウェアハウス内のVMの数を掛けたもの。 メモリが使い果たされたことを検知して、データをdiskに吐き出す(Spill)。 メモリ負荷が高くなりすぎると、VMは落とされて\"リタイア\"(前述)する場合がある。 情報科学の用語の1つにDOP(Degree Of Parallelism)がある。 WSSは1個のジョブを何個のプロセスで同時処理して完了するか、という制御を行なっているらしい。 VMのCPUコアが1つのプロセスを受け持ち、CPUコアの数だけプロセスを並列実行できる。 例えばCPUコアを8個もつVMを4個もつウェアハウスの保持コア数は合計32個。 1つのジョブを32コアで並列処理しても良いし、逆に32個のジョブを1コアで処理しても良い。 DOPはコンパイル時に推定される。 以降、ジョブスケジューリングの少し詳しい説明が書かれている。 実行中の各ウェアハウスは既にキューにジョブが積まれている。 その上で新しいジョブを処理する場合、どのウェアハウスで処理すべきかを決めることになる。 WSSはウェアハウスの全てのVMに均等に負荷分散されるべき、という仮定を立てる。 クラウドサービスレイヤは、ジョブの処理に必要なメモリとコンパイル時に決まったDOPから、 そのジョブをどのウェアハウスで処理するかを決める。 メモリの使用状況や同時実行性(?、キューに積む時点でジョブがどれぐらい並列実行されているか??) を見て、ウェアハウスの適格性を決める。適格性が同じなら、その時点で同時実行ジョブが最も少ない ウェアハウスを選択する。適格なウェアハウスが無い場合、WSSキューに残り続ける。 ジョブスケジュールを行うと、各ウェアハウスのリソース使用状況バランスが変化する。 WSSはクラウドサービスにVM使用状況のレポートを送る。 クラウドサービスは状況次第でDOPを下げる(より少ない並列度で処理するよう計画される)。 DOPを下げた後、ジョブはウェアハウスで実行される。ジョブ終了後リソースは解放される。 負荷に応じてDOPがダイナミックに調整されている様が書かれている。 実際のところ、DOPの推移を観察することはできないのと、DOPの上げ下げとパフォーマンスの 関連が本当にその通りなのか不明なこともあり、結局良くわからない。 並列レベルの制御 MAX_CONCURRENCY_LEVELパラメタにより、最大並列処理数を設定できる。 デフォルト値は8ということなので、最大で4個のジョブを並列実行することになる。 巨大なクエリを処理する場合、1個のジョブを受け持つコア数を増やすことでスループットが上がる 場合があるらしい。並列処理数が下がるとキューに積まれるジョブが増えることに繋がる。 ウェアハウスサイズを増やさずにMAX_CONCURRENCY_LEVELだけ調整しても、 リソースの総量は変わらないはずだし、簡単に最適値が見つかるなら全自動で決めてくれる のだろうから、きっと難しい話なのだろう。QASみたいに全然違う何かを使うと良いよ、と書かれている これは公式の以下のドキュメントが対応する。 同時実行クエリの制限 リソースモニタと使用量制限 クレジットを想定よりも多く消費しないようにするアラートとハードリミットの仕組み。 消費クレジットが制限を超えたことをトリガにアラート、自動停止を実行できる。 リソースモニタが設定されていないウェアハウスを以下のクエリで見つけて設定せよとのこと。 SHOW WAREHOUSES ; SELECT \"name\" AS WAREHOUSE_NAME ,\"size\" AS WAREHOUSE_SIZE FROM TABLE(RESULT_SCAN(LAST_QUERY_ID())) WHERE \"resource_monitor\" = \'null\' ; ウェアハウスの負荷とサイズの決定方法 Snowsightでウェアハウスの負荷を確認できる。これの計算方法などが書かれている。 確かに、あれ、何をどうやって集計したチャートなのか知らなかった。 Snowflakeが出力するメトリクスを見てウェアハウスの正しいサイズを決定せよとのこと。 ウェアハウスのジョブ負荷メトリクスは、一定期間内の実行ジョブ数、キューに入ったジョブ数の 平均である、とのこと。実行ジョブ数の平均は、全てのジョブの実行時間(秒)を期間(秒)で 割った値であるとのこと。これはバーの青色の部分だな。 Private Previewで、ウェアハウスの使用率メトリクスが用意されるらしい。 以下の表のように、ウェアハウス単位、クラスタ単位で100分率の値を得られる。 ウェアハウス負荷や使用率によって、キャパシティ割り当てを行うべきとのこと。 どういう数字だったらどうすべきか書かれている。そういえば知らなかった。 ワークロードのスループット・レイテンシが適切で、キューに入ったクエリが少なく、 長期にわたりクエリ負荷が1未満、かつ、使用率が50%を切る場合、 ウェアハウス・クラスタのダウンサイズを検討する。別のウェアハウスを起動し、 キューに入れられたジョブをそのウェアハウスで実行できるようにする。 ワークロードのスループット・レイテンシが期待よりも低速で、かつ、 クエリ負荷が低く、かつ、使用率が75%を超えるなど高い場合、 ウェアハウスのアップサイズを検討するか、クラスタの追加を検討する。 使用量の急増(スパイク)が繰り返し発生する場合、 ウェアハウスの追加・クラスタの増量を行い、スパイクに対応するクエリをそれに移す。 スパイク以外のクエリを小さいウェアハウス・クラスタで実行されるようにする。 ワークロードが通常よりも大幅に高い場合、 どのジョブが負荷に寄与しているのか調査する。 ウェアハウスが定期的に実行される(スパイクではない)が、かなりの期間にわたって 合計ジョブ負荷が1未満である場合、 ウェアハウスのサイズダウン、クラスタの削減を検討する。 ストレージ・キャッシュ-ストレージアーキテクチャ Snowflakeには、テーブルの永続化、JOIN等のクエリ演算子によって生成されクエリの実行中に消費される 中間データの2つの形式のストレージがある。 永続化テーブル 寿命が長い永続化テーブルは、S3等のオブジェクトストレージが使われる。 オブジェクトストレージは比較的スループットが高くないが、長期間保管する際の可用性要件が良い。 S3等のブロックストレージに対して一括上書きすることになるが、immutableなデータを 扱うには適している。ブロックストレージの上でimmutableなデータの水平展開を行う。 (別のMedium記事で、micro-partitionはテーブルのバージョニングであって、immutableな データ領域を重ねていくことと、その仕組みにより副作用的にTime-Travelが用意されることが 書かれている。micro-partitionがブロックストレージ上で増えていく様は面白い) immutableなファイルには列データ、属性データがグルーピング・圧縮され格納されている。 相対位置が付与されていて再構成しやすい。 ブロックストレージに備わっている「部分的な読み取り」機能により、これらのファイルの 必要な部分を取得する。こうして永続化テーブルがブロックストレージに保管・使用される。 JOIN等のクエリ演算子によって生成されクエリの実行中に消費される中間データ 中間データは寿命が短く低レイテンシ・高スループットが求められる。 ジョブの実行にウェアハウスのメインメモリとSSDが使われる。 これらはウェアハウスの開始時に作られ、終了時に破棄される。 これらの一時ストレージは、リモートにある永続化テーブルのライトスルーキャッシュとして機能する。 各仮想ウェアハウスはそれぞれ個別に一時ストレージを持ち、クエリ実行時に使用される。 この一時テーブルは、全ての仮想ウェアハウスから\"個別にコピーすること無しに\"共有できる。 メモリ管理を単純化するためのSpill 中間データの書き込み操作の際に、まずウェアハウス内のメインメモリが使われる。 メインメモリがfullになると、ウェアハウスのローカルdisk(SSD)が使われる。 ローカルdiskがfullになると、リモートストレージが使われる。 メモリ不足、ディスク不足を回避するための仕組みになっている。 事実としては良く知られた挙動だけれども、それと「メモリ管理の単純化」というストーリーが 紐づいて理解しやすくなった気がする。 ストレージ・キャッシュ-キャッシュ戦略 「キャッシュ」とは、良く使うデータを取り出しやすいところに一時的に保存しておくもの。 キャッシュ容量は限られるため、ヒット率を維持しつつ効率的に中身を更新することが重要。 その具体的な仕組みとして、LRU (Least Recently Used)、LFU (Least Frequently Used)が有名。 キャッシュが必要な中間データ(前述)量が小さい場合、一時ストレージレイヤ(=VMのdisk)は、 ファイル名のハッシュ値を使ったLRUキャッシュにより、頻繁にアクセスする永続化データの キャッシュとして使われる。このキャッシュは低優先度で\"lazy\"に行われるらしい。 ファイルが仮想ウェアハウスのどのVMにストアされるかについて「一貫性」が言われている。 一方向関数にファイル名を食わせた結果、ファイル名とストア先VMが決まることを言っている。 サイズ変更によってVMの追加・削除が行われる際にキャッシュがシャッフルされてしまわない。 (VMのサイズが同じならば)永続化ストレージ上のファイルは特定のVMに保存されるため、 永続化ストレージ上のファイルに対する操作は、そのファイルのハッシュが保存されるVMが 実行するようにスケジューリングされる。こうして、ジョブの並列化はファイルのハッシュ値が 一貫して同じVMに保存されることと密接に結びついている。 ファイル名が偏っているとハッシュも偏り、保存先のVMが偏る場合がある。 それを回避するため、ワークロードがそのVMでの所要時間が他のVMでの所要時間よりも 小さいかどうか、に基づいてクラスタ内のVM内でロードバランシングが行われる。(え..?) キャッシュ(execution artifacts)が移動した場合(キャッシュアウトした場合)、 最初に実行がスケジュールされていた既に過負荷になっているVMの負荷がさらに増加する のを避けるため、操作の実行に必要なファイルが永続化ストレージから読み取られる。 仮想化の問題、ネットワークの問題など様々な理由で一部のVMが極端に遅い時があるらしい。 その対策にもなっているらしい。 Snowflakeのスケジューリングロジックは、execution artifactsを永続化ストレージ のキャッシュ先と同じVMに配置することと、全てのexecution artifactsを少数のVMに 配置することの間のバランスを見つけようとする。 前者は永続化ストレージのReadに伴うネットワークトラフィックの最小化を目指すが、 ジョブがウェアハウス内の全てのVMにスケジューリングされることによって中間データが VM間でやり取りされることに起因してネットワークトラフィックが増加するリスクもある。 後者は中間データ交換のためのネットワークトラフィックがなくなる(減る..?)が、 永続化ストレージのReadのためのネットワークトラフィックが増加する可能性がある。 一時データ容量はリモートの永続化ストレージ容量よりもかなり小さい(平均0.1%未満) にも関わらず、Snowflakeのキャッシュスキーム上では、Readのみのクエリで-80%、 Read-Writeがあるクエリで-60%のキャッシュヒット率にもなるらしい。 文章だけでは読みきれないな..。ただキャッシュの仕組みが書かれているだけでなく、 永続化ストレージ上のデータ(=ファイル)をVMに持ってくる仕組みの説明になっていて、 ウェアハウス内のVMで負荷分散して処理していく様が薄ら分かった気がする。 マルチテナント環境におけるセキュリティとリソース分離 アカウント、ジョブごとにデータを分離し、アカウント、ジョブ間でデータが漏洩しないように 設計している。\"仮想マシンを分離すること\"により、各テナントの分離を実現している。 さらに、cgroup、カーネル名前空間、seccomp(※)のようなDockerコンテナに似たカーネルプリミティブ を備えたVM内のサンドボックスにより、同一顧客アカウント内のジョブ間の情報漏洩を防ぐ。 ※cgroup,カーネル名前空間,secompはLinuxカーネルの機能で、 Dockerコンテナの内部で使われている。 cgroup,namespaceは、プロセスグループのリソース(CPU、メモリ、ディスクI/Oなど)の利用を 制限・隔離するLinuxカーネルの機能とのこと。seccompは自プロセスが発行するシステムコールを 制限してプロセスを乗っ取られたとしても被害を最小限にする機能とのこと。 各VMを独自のハードウェア、ページテーブル、カーネルを使用して動作させることで、 マルチテナントセキュリティとリソース分離を図っている。 VMが同じハードウェア、ページテーブル、カーネルを使用した\"VM分離\"がない場合、 従来から使われているカーネルカーネル共有方式(cgroup,名前空間,secomp付き)だけでは、 Snowflakeのセキュリティ基準に達しないと判断したそう。(そうですか..)。 \"VM分離\"するよりもカーネルを共有した方が、コンテナは高速に起動して都合が良いけれども、 カーネルを共有するということは、過去のCVEsから予想されるセキュリティ脆弱性に曝露される ことになる。 仮想ウェアハウスを構成するVMはそのウェアハウスが占有するプライベートなリソースであって、 仮想ウェアハウス間で共有されたりはしない。加えて仮想ウェアハウスはステートレス。 データの状態に影響されず、需要に応じてどんな時でも作成・破棄・リサイズできる。 その仕組みのため、ジョブが特定の仮想ウェアハウスで限定して実行されるから、 その仮想ウェアハウスのパフォーマンスが他の仮想ウェアハウスのパフォーマンスに影響しない。 ジョブ実行の際、各仮想ウェアハウス内のVMが新しいプロセスを起動する。 そのプロセスはジョブの実行期間中にのみ生存する。 プロセスの失敗は自動的に検知され即座に修正(再実行)される。 ユーザは、いつでも複数の仮想ウェアハウスを実行できる。 各ウェアハウス上で、複数のジョブが並列実行する。 ネットワークセキュリティ 仮想ウェアハウスは次の外部ネットワークアクセスを必要とする。 クラウドサービスレイヤとの通信 ジョブ実行時に発生する他の仮想ウェアハウスとのデータ共有 ローカルのクラウドストレージ(diskのspill先)へのアクセス API Gatewayへのアクセス Snowflakeは全ての仮想ウェアハウスからのネットワークトラフィックを信用しない。 内部サービスへのトラフィックは必ず認証済みのエンドポイントを経由する。 外部ネットワークへのトラフィックは外向きプロキシを経由し、アクセス制御ポリシーが適用される。 未認証のエンドポイントへのアクセスはブロックされ、予期しない動きはSnowflakeに報告される。 アカウント間で予期しない漏洩が起こらないように、VM、proxy、ジョブ間でやり取りされる全ての 通信が正常であることを、クラウドサービスレイヤがIPアドレスマッチングを行うことで検証する。 仮想ウェアハウスが持つ署名済みの共有シークレットを使って、仮想ウェアハウス間の全ての通信 について、発信・着信側が本当にSnowflake内部の仮想ウェアハウスであるか検証する。 そもそも仮想ウェアハウスからクラウドサービスレイヤへの通信がむちゃくちゃ多くなり、 DoS攻撃のようにならないように、通信にレートリミットがついていたりするらしい。 他には、フローログを使って何かをしているらしい。フローログって何か知らなかったので調べた。 NWインターフェース間で行き来するIPトラフィックに関する情報をキャプチャする機能。とか。 Wireshakみたいなやつだろうか。例えば、仮想ウェアハウス内のVMが知らないdestに対して 送ったIPトラフィックを見つけてforensic inspectionを行いVMを隔離するなど。 ※デジタルフォレンジック。「証拠保全」みたいな使われ方をしている。 うーん..難しい... ネットワークセキュリティと言うと、つい外部から内部(Ingress)の事かなと思っていたが、 SaaSの内部で好き放題されてしまうリスクがある気持ちを理解した。 外部ネットワークアクセスはこの気持ちの上に成立しているんだろう。 Python/Scala/Javaコードの分離 SQLみたいに出来ることが制限されている言語とは違い、何でもできるJava/Python/Scalaで UDFやプロシージャを書くことはセキュリティ面でリスクがいっぱい。 これらの言語で書いた処理は、パフォーマンスの観点で、ジョブの他の処理と同じVM上で動く。 マルチテナント環境上で(処理を?)分離するために(前述のように再利用できない)VMを使用する のに加え、cgroups, namespaces, secomp, eBPF, chrootのようなLinuxカーネル の要素を使ったセキュアなサンドボックスを提供することで、ジョブに割り当たったスコープの外の 情報にアクセスしたり、処理がSnowflakeの他の機能に影響したりしないようにしている。 (これらは前述されている。それぞれうっすら調べてみた。こういう風に作るんだなぁと面白い) Java/Python/Scalaで書かれた各ジョブには、実行用に新たにサンドボックスが割り当てられる。 コードの実行に最低限必要なread-onlyのソフトウエアが用意される。 サンドボックス用のchrootが用意され(/より上に行けない)、その下には書き込み可能ディレクトリが いくつかあるだけ。ジョブはそこで処理を行う。read-onlyなディレクトリがマウントされて、 JavaのJARパッケージ、Pythonパッケージや、データファイルはそこで共有される。 サンドボックス内のジョブ(のリソースを使用するプロセス)はcgroupが設定され、 使用メモリ、CPU使用量、PID使用量(プロセス数?)が制限される。 マルチプロセッサユースケース(マルチスレッド化してプロセス内で処理を並列化する話?)のため スレッド生成がサポートされる。 さらに、許可リスト(IPC,Inter Process Communicationに関するリソースを隔離する仕組み= IPC Namespace、eBPF,extended Berkley Packet Filter=カーネル内で発生した イベントで駆動する処理を安全・簡単に組み込む仕組みによって、予め許可していないartifacts がサンドボックスの外に接続するUNIXソケットを開けないようにする)によるネットワークアクセスの 制限、process namespaceによるVM上の他のプロセスを見えなくする制限、 seccomp(子プロセスのフォーク、実行可能プログラムの実行)によるカーネルAPIの不必要な 実行の回避が行われる。脅威検知のためptraceがシステムコールを管理する。 ジョブが完了した後、VM上の環境のもろもろの解放、開いたソケットのクローズ、 クレデンシャルの削除、ローカルキャッシュ、一時ファイル、ログの削除が行われる。 追加の多層防御手段?(defense-in-depth measure?)として、規定時間内に終了しなかった Python/Job/Scalaコードを実行するプロセスに対して、監視プロセスがkillシグナルを送る。 サンドボックス外に離脱したり、攻撃者が仮想ウェアハウス上のVMにプロセスを残したり ルートキットを配置する未知のリスクに備えて、Python/Java/Scalaコードを実行したVMは 「実行不可」としてマークされる。仮想ウェアハウスのスケジューリングや起動済みVMをプールする 仕組みの上で、Python/Java/Scalaコードを実行したVMが異なるアカウント・ユーザに 割り当てられると、アカウント間情報漏洩のリスクに繋がってしまうため、異なるアカウント・ユーザに 割り当たらないようになっている。Python/Java/Scalaコードを実行するVMが作られると、 アカウント専用のVMプール入れられる。新しいVMを割り当てるときは、まずはアカウント毎の空き プールからVMが選ばれる。 多数のゼロデイエクスプロイト(脆弱性が発見されてからパッチが当たるまでの期間の攻撃)が 連続して使用されると、サンドボックスが破られてしまうかもしれないが、それに備えた作りに なっている。まずエクスプロイトは、ユーザアカウントで実行中のVMに存在する。このVMは、 Snowflakeサービスや、Snowflake内のローカルネットワーク上のVMから隔離されている。 攻撃者が手にしたクレデンシャルは(サンドボックスを破壊した)特定のアカウントの特定のVMに 限定され他では使用できない。 あくまで論理的な構成が書かれているだけで「コンテナ」というワードも無いし、何かチラチラとするな。 こういうのを「コンテナエスケープ」とか言うらしい。 ソフトウエア更新の管理 Snowflakeの各機能がどうやって仮想ウェアハウスにデプロイされるかについて。 (デプロイの)ワークフローにより新機能、セキュリティアップデート、機能改善が行われる。 全ての処理は自動化されていて手作業の間違いが起きないようにしている。 このリリースプロセスにおいて、単体テスト、回帰テスト、結合テスト、性能、負荷テストが行われる。 リリースプロセスは、本番の前段の環境、または本番に近い環境で行われる。 VMがフリープールに入る前に最新のパッチが当たる。VMのStartやResumeなどの操作の後に、 フリープールからVMに割り当たったり、逆にVMからフリープールに抜けたりするが、 フリープールからVMに割り当たるプロセスの一部として、VMに最新に保つための最新のバイナリが ダウンロードされ、適用される。 Resume、Startなどのライフサイクル操作は即座に終わるように作られているが、 影響を与えないように性能要件が与えられているらしい。 SKU sizeやOSのメジャーパーションなど大きな変更の際には、未適用のVMと適用済みのVMの両方が 同時に動く状態となる。古い方は既存のジョブを実行し、新しい方は、新しいジョブを実行する。 そのようにジョブがルーティングされる。 既存のジョブを実行し終わってから、最終的に古い方は消される。 つまり、1個のウェアハウスについて、アップデートの時期を迎えると背後で(適用前後の)2個になる。 前述のようにキャッシュはVMのローカルディスクなので、もし古いウェアハウスが破棄されたとすると、 キャッシュが失われることになる。 それによりキャッシュミスが発生しパフォーマンスに影響しないように、事前に管理されているとのこと。 がんばってテストしているけれども運用環境にバグが混入することもある。 なのでアップデートをロールバックできるようになっている。 クラウドプロバイダのリージョン毎に、動作中のバイナリの背後で、古いバイナリをコピーしている。 古い方は非アクティブのままとしている。(トラフィックが発生しない?) 大規模障害に備えて、通常、新しいジョブを新しいバージョンのウェアハウスにルーティングしている ものを古いウェアハウスにルーティングするロールバックをできるようにしている。 Issueに基づいて顧客ごとに対象を絞ったロールバックをすることもあるらしい。 顧客のワークロードはそれぞれ大分ことなるので、全員が同じ頻度でバグを踏むことはないので。 特定の顧客に対して、アップデートした一部のリリースをロールバックする、みたいなことをするらしい。 リリースノートの扱いが良い感じになっていて、こういう感じで運用されているのだな、と。 将来の機能 現在、ユーザは、ワークロードの複雑さ、処理時間、コストを考慮して適切な調整を行わないといけない。 例えば、サイズ、ウェアハウスタイプ、クラスタ数、スケーリングポリシーなど。 こういったキャパシティ調整の大変さを減らしたり無くそうとしているらしい。 microVM(例えばFirecrackerやKata Containersなど)やシステムコールのオフロードに 投資し、より強力なサンドボックス分離メカニズムを実現しようとしているらしい。 それにより、Python/Javaコードで現状ではできないことが出来るようにしたいらしい。 まとめ Deep dive into the internals of Snowflake Virtual Warehousesを読んでみました。 たぶん公開されていない内部の仕組みの割合が多いのかなと思いましたがどうでしょうか。 正直かなり難しくて、途中、ほとんど写経状態になっている部分もありますが、 なるべく分からないところを調べながら、何を言いたいのかを趣旨の理解に努めました。 正直、知らなくても問題ないし、公開されていない以上、実際は違うかもしれないし、 将来変更されてしまうかもしれません。 1週間ぐらいかけて読んでみて、公開されている仕様を説明しやすくなった気はしました。

default eye-catch image.

GCSとのストレージ統合を設定した話

SnowflakeはS3, Blob, GCSを外部ステージとして設定し、データをロードする機能を備えています。 Snowflakeは各クラウドストレージとの接続方法として「ストレージ統合」の使用を推奨しています。 今回、GCSとのストレージ統合を設定し、外部ステージのロード先としてみました。 その手順等を感想を混ぜつつ書いてみようと思います。 参考にしたSnowflake公式ドキュメントは以下です。 ストレージ統合の設定方法がステップbyステップで示されています。 Google Cloud Storageの統合の構成 この記事は、公式が説明するステップに従って書いていきます。 [arst_toc tag=\"h4\"] GCSとストレージ統合 一般に、各ストレージサービスと連携する際、接続に必要な接続情報が必要です。 接続情報を自力で保管し使用する場合、セキュリティリスクに晒されるため、 Snowflakeはよりセキュアな方法として「ストレージ統合」機能を提供しています。 現在、Snowflakeは「ストレージ統合」機能のみを推奨しています。 各クラウドプロバイダ毎に実装方法は異なり、GCSの場合、Service Accountを使用します。 クラウドプロバイダ間で概念が異なるため 「AWSでいうところのXXXだよね」 は誤解を生みますが、 簡単に言えば、AWSのIAM、Azureのサービスプリンシパル、が相当すると思います。(本当??) 以下の図はその概念図です。(公式直リンク) Snowflakeは、ストレージ統合オブジェクトをGCS用のサービスアカウントと紐付けます。 このサービスアカウントは裏でSnowflakeが作成します。 言い換えると、GCSの認証責任をSnowflakeが作成するサービスアカウントに委任します。 Snowflake上のロード・アンロード操作の裏で透過的にサービスアカウントの権限参照が行われます。 Snowflakeでストレージ統合を作成する Snowflake上でストレージ統合を作る方法は以下の通りです。 実行にはACCOUNTADMINロールか、CREATE INTEGRATION権限が必要です。 ストレージ統合を作成する段階では、GCP側の権限設定は不要です。 CREATE STORAGE INTEGRATION gcs_int TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = \'GCS\' ENABLED = TRUE STORAGE_ALLOWED_LOCATIONS = (\'gcs://mybucket1/path1/\', \'gcs://mybucket2/path2/\') STORAGE_BLOCKED_LOCATIONS = (\'gcs://mybucket1/path1/sensitivedata/\', \'gcs://mybucket2/path2/sensitivedata/\'); 自動生成されたサービスアカウントの情報を取得 CREATE INTEGRATIONによって作られたSnowflake用のサービスアカウント情報を取得します。 オブジェクトの詳細情報を取得する DESC コマンドにより取得できます。 DESC STORAGE INTEGRATION gcs_int; +-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+ | property | property_type | property_value | property_default | +-----------------------------+---------------+-----------------------------------------------------------------------------+------------------| | ENABLED | Boolean | true | false | | STORAGE_ALLOWED_LOCATIONS | List | gcs://mybucket1/path1/,gcs://mybucket2/path2/ | [] | | STORAGE_BLOCKED_LOCATIONS | List | gcs://mybucket1/path1/sensitivedata/,gcs://mybucket2/path2/sensitivedata/ | [] | | STORAGE_GCP_SERVICE_ACCOUNT | String | service-account-id@project1-123456.iam.gserviceaccount.com | | +-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+ STORAGE_GCP_SERVICE_ACCOUNTプロパティが、作成されたサービスアカウントです。 バケットオブジェクトにアクセスするためのサービスアカウント権限を付与する 最後に、サービスアカウントがバケットへアクセスするための権限設定を行います。 ここで初めて、GCPコンソール上での操作が必要となります。 カスタムIAMロールの作成 以下の権限を持つIAMロールを作成します。今回はロードのみを行うため以下を設定します。 パージを行う場合はobjects.delte、アンロードを行う場合はobjects.createが必要です。 storage.buckets.get storage.objects.delete storage.objects.get storage.objects.list サービスアカウントへIAMロールを割り当てる GCPのやり方に従って、サービスアカウントへIAMロールを割り当てます。 バケットに対して、サービスアカウントのアクセスを許可し、その許可内容をIAMロールで指定します。 Azureと比較するとAWSとGCPは近いなと感じます。 外部ステージの作成 まず、ステージオブジェクトを格納するSnowflakeデータベースへの権限付与を行います。 GRANT USAGE ON DATABASE mydb TO ROLE myrole; GRANT USAGE ON SCHEMA mydb.stages TO ROLE myrole; GRANT CREATE STAGE ON SCHEMA mydb.stages TO ROLE myrole; GRANT USAGE ON INTEGRATION gcs_int TO ROLE myrole; 次にストレージ統合を指定して外部ステージオブジェクトを作成します。 USE SCHEMA mydb.stages; CREATE STAGE my_gcs_stage URL = \'gcs://mybucket1\' STORAGE_INTEGRATION = gcs_int FILE_FORMAT = my_csv_format; 最後に疎通試験を行います。 copy into testtable from @my_gcs_stage pattern=\'testdata.csv\'; まとめ Snowflakeの公式ドキュメントの通りにGCSとのストレージ統合を作成しました。 また、作成したストレージ統合上に外部ステージを設定し、ロードが出来ることを確認しました。

default eye-catch image.

デプロイメントについて調べてみた話(端折り気味)

dbt Fundamentalsの「Deployment」のセクションについて理解した内容を言葉にしてみた。 自動化を解決するのにdbt Cloudを使用した記事となっている。この記事は前回の続き。 [clink url=\"https://ikuty.com/2023/08/23/dbt_docs/\"] いったんdbtそのものの理解に留めたいので、かなり端折気味。 [arst_toc tag=\"h4\"] デプロイメントとは 以下、動画の意訳。 開発環境であればIDE上でコマンド(dbt run,dbt test)をAd-Hocに必要に応じて実行してきた。 本番環境へのデプロイメントは異なる。本番環境用に分離されたブランチ(main,master?)が存在し、 そして分離されたスキーマが存在する。(開発環境と異なり)意思決定に必要なデータが存在する。 また、本番環境では(Ad-Hocではなく)dbtコマンドをスケジュール実行する。 ビジネス要件に応じてdbt run, dbt testを実行することになる。 本番環境は、分離されたブランチ・スキーマ上においてSSOT(Single Source Of Truth)を実現する。 そして、これまで作業してきた開発環境を分離し、様々な変更を本番環境への影響なしに実行する。 Snowflakeの本番検証環境分離の戦略 これ、動画にないのだけれど、そもそもSnowflake側を検証環境と本番環境に分離しないといけない。 以前、以下の記事にマルチテナントに関係するホワイトペーパーを読んで内容を整理してみたが、 そこで、環境を分離するにあたって必要な考慮事項がまとめられている。 [clink url=\"https://ikuty.com/2023/03/29/snow-multitenant-design/\"] 概ね、テナント=環境と捉えて問題ないという感想を持っている。 OPT(Object Per Tenant)、つまり、同一アカウント内に検証/本番環境を共存させるスタイルと、 APT(Account Per Tenant)、つまり、アカウント毎に検証/本番環境を分離するスタイル、 両方あると考えている。 Snowflakeにおいて、アカウントを跨いでオブジェクトを共有できないことがポイントで、 それを不自由な制限事項と取るか、確実な安全の確保と取るか、どちらかの選択となる。 OPTであれAPTであれ、オブジェクト数の増加に伴いTerraform等のツールが事実上必須となる ことから、実はAPTでも事実上の共有をしているのと同じとなり、APTの方が良いのかなと思っている。 URLが分離するので運用の調整がしやすいし、第一に安全性にグレーな部分が無くなる。 dbt Cloud上でのデプロイメントの準備 dbt Coreの各コマンドをスケジュール実行することで機能を実現する、という特徴から、 そもそも検証/本番環境を実現するためにはdbt Coreだけでは機能が足りない。 dbt Cloudには、検証/本番環境の実現に必要な機能が乗っている。 なので、このDeploymentセクションはdbt Cloudの機能説明に近い。 ちょっと記事の趣旨から外れてしまうのでサラッと眺めるに留める。 dbt Cloud上のDeploymentメニュー配下には以下のメニューがある。 - Run History - Jobs - Environments - Data Sources Environmentsから、Deployment単位に対する操作が行える。 Create Environmentの操作により、新たなEnvironmentを作成できる。 - General Settings - Name - Type - dbt Version - [Check] Only run on cuustom branch データプラットフォーム(DW)との接続に必要な設定を行う。 DW側がアカウント分離方式で環境分離しているのであれば、ここで本番検証の差異が発生する。 - Deployment Connection - (Overwrite connection settings for this environment, only certain fields are able to be overwrite ) - Account - Role (Optional) - Database - Warehouse 接続に必要なCredentialsの設定を行う。環境分離されている粒度で接続を行う。 - Deployment Credentials - (Enter your deployment credentials here, dbt will use these credentials to connect to your database and run scheduled jobs in this environment ) - Auth method - Username - Password - Schema ジョブ実行時の設定を行う。タイムアウト設定や、都度ドキュメントを作るか、都度source freshnesを判定するか、 また、そもそも何のコマンドを実行するか。 - Execution Settings - Run Timeout - (Number of seconds a run will execute before it is canceled by dbt Cloud. Set to 0 riever time out for this job) - Defer to a previous run state ? - Generate docs on run - (Automatically generate updated project docs each time this job runs) - Run source freshness - (Enables dbt source freshness as the first step on this job, without breaking subsequent jobs) - Commands - (This is where you can pass whatever dbt commands you would like. So this could be dbt run, dbt tests, dbt seed, whatever it might be) 省略... - Helpful Resouruces - Enabling CI - Souurce Freshness dbtがどういったトリガでジョブを起動するかを設定する。 結構いろいろなことができる。スケジュール実行だけでなくWebhook、APIなんかも設定できる。 細かいところは省略。 - Triggers - Configure when and how dbt should trigger this job. - Schedulue - [Check] Run on Schedule - [Radio] Schedule Days - Subday,Monday,Tuesday,Wednesday,Thursday,Friday,Saturday - [Radio] Enter custom schedule - Timing - Every [??] hours (Starting at midnight UTC) - At exact intervals [??] UTC (e.g.\"0,12,23\" for midnight,noon,and 11PM UTC) - Webhooks - [Check] Run on Pull Requests ? - (省略) - API (省略) デプロイメント まとめ Historiesから、Jobの実行の実行履歴を観察できる。 以下、上からGit RepositoryからCloneした後、Snowflakeとの接続諸々を実行、 dbt deps (dbt Fundamentalsで出てきてないが、モジュール化されたパッケージのインストール)を実行。 まとめ dbt Fundamentalsの動画を聴いて、dbtのデプロイメント機能を追ってみた。 dbtそのものを追いたいので大分端折ってしまった。あまり記事に意味がないかもしれない。 自分の経験が追いついてきたらもう一度トライしてみたい。

default eye-catch image.

dbtのドキュメント生成機能について調べてみた話

The dbt Viewpointにおいて、dbtはソフトウエア開発のシナリオで解決されてきた諸々を考慮した F/Wであることが説明されている。その中で、バージョン管理、継続的なテストによる品質保証、そして、 ドキュメンテーションの重要性が改めて説明されている。dbtの対象はアプリケーション開発と地続き。 The dbt Viewpointについて以下の記事で書いてみた。 [clink url=\"https://ikuty.com/2023/08/23/dbt-viewpoint/\"] dbt Fundamentalsの「Documentations」セクションを読んで理解した内容を書いてみる。 [arst_toc tag=\"h4\"] なぜドキュメンテーションが重要なのか? dbtが想定するAnalytics Engineeringの新ワークフローで色々解決しようとしている。 dbtが備えるドキュメンテーション機能が存在する理由について言及されている。 ちょっとどうかな..と思うことが多かったが言及されている内容をまとめてみる。 ユーザによる自己解決 ソフトウエア開発のシナリオでは、ドキュメントは主に開発者間のコラボレーションが目的だが、 Analytics Engineeringの世界ではデータの作成者と消費者の間のコラボレーションが目的。 データの作成者と消費者がデータの仕様を共有することは実際の運用で最も重要だと思う。 例えば「どうやって計算したのか」、「どこから持ってきたのか」といった各自の疑問を自己解決して もらいたい、というモチベがあると言及している。 成果物とドキュメントの統合 ソフトウエアの成果物とドキュメントが分離すると管理が面倒になる。 ソフトウエアのアップデートにドキュメントが追いつかず嘘八百となることは良く観察される。 dbtは、ドキュメントを成果物の一部とすることでその問題を解決することを言及している。 記述の簡便さ YAMLで書けるので新しい文法の理解は不要としている。 ただしYAMLは厳密なので、それはそれで定義を理解する必要はあると思う。 既存のjavadoc的何かとの比較ではなく、何も無いところとの比較ではその通り。 dbtのドキュメンテーション では、dbtでそれをどうやるのか。 DAGの観察 dbtには、Sourceから最後のモデルまでの流れを自動的に可視化する機能がある。 全てのmodelの依存関係をSoruceから順番に示してくれる。 特に意識せずとも、model内のref関数が依存関係を示す情報として使われる。 動画では\"lineage\"ではなく\"DAG\"と言われている。 Model内のドキュメント dbtには、modelファイルに特定のテキストを記述することでモデルに情報を付与する機能がある。 どこか別のリポジトリではなく、modelファイルに直接書けることが良いのだよ、と言われている。 modelを書いた後、一呼吸おいて別のところで書くのではなく、modelと一緒に書くのだよとか。 これどうだろうな...。急いでたらSQLだけ書いてコメント直さないなんてありそうだが。 Modelのドキュメント 直書きとdoc blocks STAGING層にある2つのモデルが例として挙げられている。 model定義の並び、column定義の並びにそれぞれdescriptionを直書きスタイルで配置している。 version: 2 models: - name: stg_customers description: Staged customer data from our jaffle shop app. columns: - name: customer_id description: The primary key for customers. tests: - unique - not_null - name: stg_orders description: Staged order data from our jaffle shop app. columns: - name: order_id description: Primary key for orders. tests: - unique - not_null - name: status description: \'{{ doc(\"order_status\") }}\' tests: - accepted_values: values: - completed - shipped - returned - placed - return_pending - name: customer_id description: Foreign key to stg_customers.customer_id. tests: - relationships: to: ref(\'stg_customers\') field: customer_id もう1つの方法として、docブロック( {{ doc() }} )を使う方法が説明されている。 上のstatusのdescriptionようにdoc()の中に識別子を書いて、 別ファイル(.md)の中でその識別子の詳細を書く。要は、長いテキストを定義の外に書くスタイル。 (さっき、同じファイルに書くところが良いって言ってたばかりなのに...) {% docs order_status %} One of the following values: | status | definition | |----------------|--------------------------------------------------| | placed | Order placed, not yet shipped | | shipped | Order has been shipped, not yet been delivered | | completed | Order has been received by customers | | return pending | Customer indicated they want to return this item | | returned | Item has been returned | {% enddocs %} Sourceのドキュメント modelの場合と同様に、source定義,table定義,column定義にdescriptionを配置している。 version: 2 sources: - name: jaffle_shop description: A clone of a Postgres application database. database: raw schema: jaffle_shop tables: - name: customers description: Raw customers data. columns: - name: id description: Primary key for customers. tests: - unique - not_null - name: orders description: Raw orders data. columns: - name: id description: Primary key for orders. tests: - unique - not_null loaded_at_field: _etl_loaded_at freshness: warn_after: {count: 12, period: hour} error_after: {count: 24, period: hour} ドキュメントの生成と閲覧 動画ではdbt Cloudの例が説明されていて、IDE上でシームレスにドキュメントが表示されている。 説明されているもので全部ではなさそうで、きっとFundamentals用に概要が話されている。 動画で説明されているドキュメントの項目は以下の通り。 - dbt docs generate でドキュメントを生成する - ymlファイルとその他のファイルに付加した情報がドキュメントに出力される - (dbt Cloudでは) View Docsボタンの押下により生成されたドキュメントを閲覧できる - 左ペインにプロジェクト内のファイル階層が表示される。(ModelやSourceが表示される) - 左ペインからModel(yml)を選択すると、そのymlと対応するドキュメントが表示される - Detailとして、TAGS,OWNER,TYPE,PACKAGE,LANGUAGE,RELATION。 - DESCRIPTIONとして、ymlに付加したdescription。 - descriptionがdoc blocksの場合、押下で展開される。 - Columnsとして、列名,型,description,TESTの有無,追加の情報 - Referenced Byとして、どのモデルから参照されているか(つまり下流にあるモデル) - またはどのテストからそのモデルが参照されているか - Depends Onとして、どのモデルに依存しているか(つまり上流にあるモデル) - Codeとして、ymlファイルの中身、と、コンパイル済みコード - yml内のjinjaテンプレートがコンパイル済みコードで展開されている。便利。 - model選択中に右下の View Lineage Graphボタン押下でmodelを含むリネージが表示される - プロジェクトレベルで View Lineage Graphボタン押下で全体のリネージが表示される まとめ dbt Fundamentalsの動画を聴いて、dbtのドキュメント自動生成機能を追ってみた。 Model、SourceにDescriptionを付与し、自動生成ドキュメントに出力されることを確認した。 dbt Cloudでしか試していない。dbt Coreで出すとファイルが吐かれるのだろうか。 ちょっと流石にこれだけだと機能が足りない気がする。 きっと他で補完は必要だと思う。読めてないだけで本体に何かあるのかもだけれども いったんFundamentalsの読みは終了。

default eye-catch image.

The dbt Viewpointを読んでみた話

古今東西、フレームワークは過去の不合理や不便さに対する解決策として生まれていると思う。 特に、アプリケーション開発のシナリオで圧倒的な生産性の向上を生み出す何かが生まれたりする。 それは過去があまりにも酷かったのもあるし、リアーキテクチャのセンスが天才的なものもあると思う。 dbtは過去の分析プロジェクトで得られたいくつかの論点を解決するように作られている。 dbtは分析における様々なタスク(以下、ワークフロー)を効率よく解決するためのツールである。 分析用ではあるが、モダンなアプリケーション開発フレームワークが解決した多くの論点に基づいて 開発されている。その由来の詳細について「The dbt Viewpoint」で説明されているので読んでみた。 [clink implicit=\"false\" url=\"https://docs.getdbt.com/community/resources/viewpoint\" imgurl=\"https://d33wubrfki0l68.cloudfront.net/38a468b6bb7aeae799ea43bb9510af0753b90d91/42e9b/img/dbt-logo.svg\" title=\"The dbt Viewpoint\" excerpt=\"In 2015-2016, a team of folks at RJMetrics had the opportunity to observe, and participate in, a significant evolution of the analytics ecosystem. The seeds of dbt were conceived in this environment, and the viewpoint below was written to reflect what we had learned and how we believed the world should be different. dbt is our attempt to address the workflow challenges we observed, and as such, this viewpoint is the most foundational statement of the dbt project\'s goals.\"] 訳と文章作成4時間以内のスピードチャレンジなので間違っていたらすみません。 原文は平易かつ丁寧な表現で面白いので読んでみてください。 [arst_toc tag=\"h4\"] dbtの由来 2015-16年, RJMetrics社から派生した分析プロジェクトは、分析のエコシステムにおける重要な発展 を観察し参加する機会を得た。dbtの種となる考え方はこの環境で考案された。 The dbt Viewpoint(この記事)に書かれている論点は、チームが学んだこと、 そして他がどう変わるべきと信じているか、について反映させるために書かれている。 dbtは、私たちが観察したワークフローの課題に対処するための私たちの試みであり、 (この資料に書いた)論点は、dbtプロジェクトのゴールを示す最も基本的な記述である。 今日のAnalytics 真の成熟した分析組織においては、プロプライエタリソフトウエアが過去のものとなり、 データ統合を行うコードとツール、ハイパフォーマンス分析データベース、SQL/R/Python、 可視化ツール、を組み合わせてソリューションを組む流れに進んでいる。 この変化により、重要な可能性を見出せるようになったが、分析チームは依然として、 引き続き、高品質、低遅延に対する高い目標に対峙しないといけない。 我々は分析チームはワークフローに関する問題を抱えていると信じている。 分析者はしばしば離れた環境で操作を行い、結果として部分的な最適化を生み出している。 結果、ナレッジはサイロ化されてしまう。同僚が既にどこかで書いたコードを再生産したりしている。 あまり馴染みのないデータのニュアンスを掴めない。 洗練されたチームであっても解決できないことを何百回も聴き、概して現状維持だと認識している。 組織の決定のスピードと品質は上がらない。 分析はこのようである必要はない(?)。実際、これらの問題についてソフトウエア開発チームが 既に解決している。ソフトウエアエンジニアが早く高品質で成果物を生産するために行なっている 多人数共同開発と同じテクニックを分析にも適用できる。 Analyticsは共同作業的 成熟した分析チームが持つべきテクニックやワークフローはソフトウエア開発に見られる以下の 共同作業的な特徴を持つべき。 バージョンコントロール 分析を行うコードは、それがPythonなのか、SQLなのか、Javaなのかそれ以外なのかによらず、 バージョンコントロールされるべきだ。分析はデータとビジネスが変化していく中で、 誰が何をいつ変更したのかを知ることが重要になる。 品質保証 悪いデータは悪い分析につながる。そして悪い分析は悪い決定につながる。 データや分析結果を作り出すコードはレビューされ、テストされるべきだ。 Bad data can lead to bad analyses, and bad analyses can lead to bad decisions. Any code that generates data or analysis should be reviewed and tested. ドキュメンテーション あなたが書いた分析はソフトウエアアプリケーションである。他のソフトウエアアプリケーションと同様に 他の誰かが「それをどのように使うのか」疑問を抱く。これは簡単に見えるかもしれないが、 (例えば?)\"収益線\"に様々な意味がある可能性があるように、そう簡単ではない。 コードには、それがどのように解釈されるべきか、基本的な記述を付与すべきだ。 また、疑問が生まれる度に、チームメンバが既存のドキュメントに追加するべきだ。 モジュール性(Modularity) もしあなたが、あなたやあなたの同僚が会社の収益について一連の分析を行うのであれば、 あなたがたは同じ入力データを使うべきだ。「コピペ」は良い方法ではない。 背後にあるデータが変化したとして、そのデータが使われる全ての箇所を変更しないといけない。 代わりに、パブリックなインターフェースとしてデータのスキーマを考慮すべきだ。 一貫性のあるスキーマを公開して、ビジネスロジックが変更されたときに一緒に変更できるような テーブル、ビュー、や他のデータセットを作ること。 Analyticsのコードはアセットである その分析を得るために要したコード、プロセス、ツールは組織が行った中心的な投資である。 成熟した分析組織のワークフローは次に示す特徴を備えるべきで、それにより組織の投資を 守って育てるべきだ。(so as to を意訳した) 環境 Analyticsは複数の環境を必要とする。分析者はユーザに影響を与えずに作業する自由を必要とするが、 一方で、ユーザは彼らの普段の仕事を行うため、依存するデータを信頼できるようにサービスレベルの 保証を必要とする。 サービスレベルの保証 分析チームは本番環境に投入された全ての分析の精度に責任をもつ必要がある。(stand behind.) エラーは本番環境におけるバグと同じレベルの緊急性をもって扱われるべきだ。 本番環境から不要になった全てのコードは非推奨となるプロセスで扱われるべきだ。 保守性を考慮した設計 ソフトウエア開発における大半のコストは保守フェーズに発生する。 このため、ソフトウエアエンジニアは保守性の観点を持ってコードを記述する。 しかしながら、分析コードはしばしば(ソフトウエア開発のものより)扱いづらいものである。 背後にあるデータの変化により、予測し修正することが難しい形で大半の分析コードは壊れてしまう。 分析コードは保守性の観点をもって書かれるべきだ。スキーマとデータに対する将来の変化を予測し、 対応する影響を最小限に抑えるためにコードを書くべきだ。 分析ワークフローは自動化ツールを必要としている しばしば、多くの分析のためのワークフローは手作業で行われる。 分析者は、sourceからdestinationへ、stageからstageへの移送手段構築にほとんどの時間を使う。 ソフトウエアエンジニアは彼らの仕事で起こる手作業の部分の広範囲をカバーするツールを構築しているう。 私たちが推測している分析用途のワークフローを実装するために、似たようなツールが必要となるだろう。 以下は自動化されたワークフローの一例。 このようなワークフローは単一のコマンドで実行できるように構築されるべき。 modelと分析は複数のソース管理リポジトリからダウンロードされる コードは与えられた環境に適合して構成される コードはテストされる、そして コードはデプロイされる まとめ The dbt Viewpointを読んで訳してみた。 アプリフレームワークで開発経験があると、かなり似た雰囲気を感じるツールなのだが、 どうやら本当にアプリ開発の分野にインスパイアされたツールだったということがわかった。

default eye-catch image.

テストとDAGの構築について考えてみた話

dbt Fundamentalsの「Tests」のセクションについて理解した内容を言葉にしてみた。 Models,Sourcesの各セクションと比較して1行が含む意味が増えている印象がある。 基礎的な内容をインプットするには結構なスローペースだとは思うが抑えていこうと思う。 この記事は前回の記事(以下)の続き。 [clink url=\"https://ikuty.com/2023/07/23/dbt_sources/\"] これだけで「データ品質」をどうこうするには足りないと思う。 データ品質に関わる何かについてはどこかで入門して記事化しようと思います。 最後の2節は「それってあなたの感想ですよね」と言って読み飛ばしてください。 [arst_toc tag=\"h4\"] Generic Tests Generic Testsは、Assertionの定義をyaml形式で記述するタイプのテスト。 dbtには4つのGeneric Testsがビルトインで定義されていて、特別なインストール不要で利用できる。 Generic Testsに関係する公式のリファレンスページは以下。 About tests property Assertの定義を書いたyaml形式のファイルをmode-pathに配置し、dbt testコマンドで実行する。 not_null 指定したテーブル・カラムの全ての値がNULLでないことをAssertする。 version: 2 models: - name: orders columns: - name: order_id tests: - not_null unique 指定したテーブル・カラムがユニークであることをAssertする。 OPTIONALな属性であるconfigにより、Assertの範囲を追加することができる。 ersion: 2 models: - name: orders columns: - name: order_id tests: - unique: config: where: \"order_id > 21\" accepted_values 指定したテーブル・カラムにvaluesで指定した値のみが存在することをAssertする。 OPTIONALな属性であるquoteにより、values内に整数値、ブール値などを配置できる。 まぁ\"false\"とfalseは違うし、\"100\"と100は違うので、そういう配慮。 version: 2 models: - name: orders columns: - name: status tests: - accepted_values: values: [\'placed\', \'shipped\', \'completed\', \'returned\'] - name: status_id tests: - accepted_values: values: [1, 2, 3, 4] quote: false relationships 参照整合性制約、つまり外部キーが参照するレコードが相手先テーブルに存在することをAssertする。 公式には、childテーブルとparentテーブルという表現が出てくる。 This test validates that all of the records in a child table have a corresponding record in a parent table. This property is referred to as \"referential integrity\". これは、yaml上の上位階層と下位階層の関係を言っていて、以下であればordersのcustomer_idと customersのidの間で参照整合性が成立するかをAssertする。 toの先として、ref関数、またはsource関数を指定する。 version: 2 models: - name: orders columns: - name: customer_id tests: - relationships: to: ref(\'customers\') field: id dbt test dbt testコマンドによりテストを開始する。 Test an expression 上記までは特定のカラムに対するAssertionを定義するものだった。 もし複数カラムを使用したAssertionを定義しようとすると、columns:以下に書けない。 この場合、models:直下にcolumns:ではなくtests:を配置することができる。 以下は(ちょっと不思議)、ordersテーブルにある「country_code」と「order_id」をハイフンで繋いだ 文字列をカラム名として持つカラムがユニークであることをAssertする。 例はアレだけど、式の評価(expression)結果をAssert定義に使用できる、といったところ。 version: 2 models: - name: orders tests: - unique: column_name: \"country_code || \'-\' || order_id\" Use custom generic test 自力で実装したテストをビルトインテストの代わりに使用することができる。 この記事では詳細は省略。 version: 2 models: - name: orders columns: - name: order_id tests: - primary_key # name of my custom generic test Singular Tests Assertをyml形式で記述するスタイルの他に、単体のSELECT文形式で定義するスタイルもある。 SELECT文を1個書いたファイルをtest-pathに配置する。 SELECT文が1件以上のレコードを返す場合、そのテストは失敗、という扱いになる。 dbt Fundamentalsの動画では1例だけサラッと説明されている。 -- Refunds have a negative amount, so the total amount should always be >= 0. -- Therefore return records where this isn\'t true to make the test fail. select order_id, sum(amount) as total_amount from {{ ref(\'stg_payments\') }} group by 1 having not(total_amount >= 0) Source Tests ModelだけでなくSourceに対してもテストを書ける。(テストを書ける対象は他にもある) dbt Fundamentalsの動画に出てくる例では、前節で説明されていたSourceにテストを追加している。 version: 2 sources: - name: jaffle_shop database: raw schema: jaffle_shop tables: - name: customers columns: - name: id tests: - unique - not_null - name: orders columns: - name: id tests: - unique - not_null loaded_at_field: _etl_loaded_at freshness: warn_after: {count: 12, period: hour} error_after: {count: 24, period: hour} testを実行するコマンド Generic tests、Singular testsを実行するコマンドは以下の通り。 dbt Fundamentalsには一部しか載っていない。(★を付けた) # ★ run all generic and singular tests in your project. $ dbt test # ★ run only tests defined singularly $ dbt test --select test_type:singular # ★ run only tests defined generically $ dbt test --select test_type:generic # ★ run source test $ dbt test --select source:test_name # run tests for one_specific_model $ dbt test --select one_specific_model # run tests for all models in package $ dbt test --select some_package.* # run singular tests limited to one_specific_model $ dbt test --select one_specific_model,test_type:singular # run generic tests limited to one_specific_model $ dbt test --select one_specific_model,test_type:generic dbt buildとDAGの仕組み 先に作ったモデルを使って後のモデルを作る、という仕組みを考えようとすると、 先の実行が上手くいかなかった場合に、後の処理を止めたいという欲求は必ず発生する。 dbt buildコマンドにより、上流側でrunとtestを行い、その結果に基づいて 下流側のrunとtestを実行する、という仕組みを作ることができる。 # hogeモデルとその上流のDAGを構築して実行 $ dbt build +hoge これ自力で作ったら大変だし、本当にその通り動くのか確認するのも大変。 たぶんこれがdbtを使う大きなメリットなんだろうな。 dbt FnudamentalsのTestsセクション内のThe dbt Build commandというパートで、 実例を使ってDAGが最後まで通るパターン、途中でテストがNGになって止まるパターンの両方が すごくライトな感じで説明されていてイメージ掴むにはとても良かった。 公式により詳細な記述があるので追ってみる。 About dbt build command (感想1)ソフトウエア開発と品質保証 昔、とあるメーカーで医療機器・システムの設計・実装・保守を長いことやっていたことがある。 医療機器が医療機器であるためには、様々な法令に基づいた品質保証を行う必要がある。 おそらく設計・開発よりも品質保証と保守のウェイトが圧倒的に高かっただろう記憶がある。 患者、医療従事者ほか、品質が保証されていないとヤバい人たちのために品質を保証する。 それはほぼ自分達のためであり、こんな具合にエコシステム全体で品質保証を言ってた。 言いたいのは、綺麗事でテストしていたのではなく明日の飯のためにテストしていた。 dbt Fundamentalsでは「データは信頼に値する品質である必要がある」みたいに書かれている。 データ分析はデータに基づいてアナリストが分析した結果で意思決定する人がいる。 嘘八百並んだ虚構のパイプラインを構築してデータ分析とか、まぁないよねと思う。 というか綺麗事抜きに持続的にお仕事もらえないよね、と思う。 (感想2)品質保証の品質は誰が保証するのか dbt FundamentalsにはAnalytics Engineering版の自動テストをやりましょう、 という世界観が書かれている。 個人的には、品質保証がコアパートであるプロダクトとOSSの相性はあまり良くないと感じている。 それは、品質保証の品質を保証するコストが高すぎるから、なのだろうなと勝手に思っている。 dbtでは、Analytics EngineerはData EngineerとAnalystの間に位置付けられ、 もしこれがビジネスに寄ったエンジニアなのであれば、どこまで寄ったとしても埋まらない溝はある。 分析要件を熟知したアナリストがdbtを使うのであればワンチャン行ける気がするが大変だろう。 ymlで書いたTestが保証する範囲をエンジニアとアナリストが連携できるのか。 Analytics Engineerって、本当に実在できるのだろうか。むずくね?、と思う。 結果が致命的になるドメインとか、あまりにほ品質保証のコストが高いドメインとか、 おそらく、プロの品質保証家がいるはずだろうから、彼らの指示に従うべきで、 そうではない「普通の」品質保証レベルを効率的に達成するためのCI/CDだと思う。 まとめ dbt Fundamentalsの「Tests」セクションを聴き、公式の情報で補足しながら内容をまとめた。 4つのビルトインテストの仕様を理解した。dbtにおけるDAGの構築の仕組みについて公式を読み、 理解した内容をまとめてみた。 大きなトピックである「データ品質」について、どこかで入門したいと思う。

default eye-catch image.

ソースについて考えてみた話

dbt Fundamentalsの「Sources」のセクションについて理解した内容を言葉にしてみた。 かなりの部分で感想を織り交ぜてみた。この記事は前回の続き。 [clink url=\"https://ikuty.com/2023/07/15/dbt_models/\"] dbtはModularityのコンセプトに基づいてクエリをバラし上流から下流へと続くデータフローを 構築するフレームワーク。上流から下流までどのようにバラしても自由ではあるが、 何度か似たような仕組みを作っていくと、共通して見られる特徴に気づくことがある。 例えば、データを取り込んで、加工して綺麗にして、中間処理をして、BIやMLに渡す、といったように、 機能単位で処理をレイヤ化しておくと見通しが良くなることに気づく。 dbtはそのバラし方についてある程度規定していて以下のようにレイヤリングすることが示されている。 Source Staging Intermediate Fact Dimension この記事はSourceレイヤについての記事。 [arst_toc tag=\"h4\"] 外部インターフェースとDRY原則 データ分析基盤の構築に限らず、システムを設計する際に外部データとのインターフェースを 独立して検討が必要な設計項目とすることが多い気がしている。 それは何故かというと「自分たちの管理外にあるものであって変わり続けるもの」だからだと思う。 インターフェース仕様を定義し変化を制御しようとする。制御下にある変化を受け入れる必要がある。 変化する対象をハードコードした場合、変化が起きたときに全てを変更する必要がある。 もし対象を1箇所に記述しそれを論理的に参照する仕組みがあれば、変化に対する変更は1回で済む。 実際には、想定する粒度や概念の範囲を逸脱した場合は書き直しが必要だろうと思うので、 それは見通しが甘かったことに対する罰として受け入れないといけない。 インターフェース仕様の想定の甘さはシステム内部のそれと比べて影響が大きい。 モダンな言語やフレームワークでは、同じ変更を何度も行わせるようなタルいことをさせないように 作られている。これはDRY(Don\'t Repeat Yourself)原則と言い一般的な設計論として話される。 dbtはELTを前提としていて、外部データはそのままテーブルにロードすべし、としている。 dbtにとって、外部データをロードした一番最初のテーブルが「生」であり変化し得るが、 DRY原則に従って、1箇所で定義し抽象化したものを使いまわせるようになっている。 Sourceの定義と利用 さて、dbtのSourceによってどのようにDRYが実現できるか見てみる。まずは生クエリ。 CTEにより記述され、1個目のsourceを2個目のstagedで使用している。 ikuty_raw.jaffle_shop.customers が生データを格納したテーブル。 ハードコードされた状態。 with source as ( select * from ikuty_raw.jaffle_shop.customers ), staged as ( select id as customer_id, first_name, last_name from source ) select * from staged ; 次にdbt。model-pathにsource.ymlというファイルを配置する。 生クエリのsourceがYMLで定義されている。jaffle_shopという名前のSourceを指定している。 スキーマやテーブルが変更されたとしても、このファイルを変更すれば良い。 version: 2 sources: - name: jaffle_shop database: ikuty_raw schema: jaffle_shop tables: - name: customers - name: orders 生クエリをモデル上で以下のように書き直す。 この際、上で定義したjaffle_shop Sourceを {{source()}} により参照している。 select id as customer_id, first_name, last_name from {{ source(\'jaffle_shop\', \'customers\') }} 古いデータに基づく分析から得られた意思決定はゴミ? データ分析界隈では「データの新しさ」がしばしば重要なトピックとなる。 しかし、それはなぜなのか上手い説明を聞いたことがなかった。 dbt公式が用意するドキュメントの中に、dbtが生まれた経緯が説明されているものがあり、 その中で、風が吹けば桶屋が儲かる的なノリでこう書いてある。 explicitにデータ鮮度の重要さを示すものではないが、 「古いデータに基づく分析から得られた意思決定はゴミ」ぐらいの気持ちになった。 The dbt Viewpoint https://docs.getdbt.com/community/resources/viewpoint Quality Assurance Bad data can lead to bad analyses, and bad analyses can lead to bad decisions. データ鮮度の定義と実行 データ分析基盤には、データを定期的に取り込む類のタスクがある。 dbtは取り込みの度に取り込んだデータの鮮度を確認できる機能を備えている。 「定期的」とは、serviceやdaemon的な何かが動いてデータソースを見にいく訳ではなく、 dbtを実行して取り込む度に毎度値を参照するという意味。 物理的には、「新鮮であると見做すことができるレコードと現在との間の許容可能な時間」 を定義し、許容できない時間差を検知することでデータが古いのか新しいのかを判断させる。 この辺り、Declarativeであり、新しい何か的な印象を持つ。 sourceの定義ファイルにfreshnessブロックを定義する。 公式による仕様の説明は以下の通り。 version: 2 sources: - name: freshness: warn_after: count: period: minute | hour | day error_after: count: period: minute | hour | day filter: loaded_at_field: tables: - name: freshness: warn_after: count: period: minute | hour | day error_after: count: period: minute | hour | day filter: loaded_at_field: ... loaded_at_fieldにデータ鮮度の判定に利用するカラムを指定する。 loaded_at_fieldと現在時刻の差がcount、periodに定義した時間を超えていた場合にレポートする。 レポートには警告とエラーの2種類が存在し、warn_afterに警告、error_afterにエラーの場合を書く。 count,periodの単語選びのセンスがちょっと分からない。periodが単位、countが数値である。 例えばcount=2、period=dayなら「2日」。Declarativeに読めば良いのか? freshnessブロックは継承関係を持たせることができる。 つまりsources直下に書いたものでデフォルトを定義し、tables配下に書いたもので上書きできる。 loaded_at_fieldカラムはtimestamp型かつUTCである必要があり、変換例が公式に載っている。 # If using a date field, you may have to cast it to a timestamp: loaded_at_field: \"completed_date::timestamp\" # Or, depending on your SQL variant: loaded_at_field: \"CAST(completed_date AS TIMESTAMP)\" # If using a non-UTC timestamp, cast it to UTC first: loaded_at_field: \"convert_timezone(\'UTC\', \'Australia/Sydney\', created_at_local)\" ここでは以下のような定義とする。 version: 2 sources: - name: jaffle_shop database: ikuty_raw schema: jaffle_shop tables: - name: customers - name: orders loaded_at_field: _etl_loaded_at freshness: warn_after: {count: 12, period: hour} error_after: {count: 24, period: hour} dbt source freshnessコマンドによりデータ鮮度が評価される。 _etl_loaded_atの最大値が現在よりも12時間以上前だったのでWARNが出た。 $ dbt source freshness 15:10:34 Running with dbt=1.5.0 15:10:34 Found 1 model, 0 tests, 0 snapshots, 0 analyses, 321 macros, 0 operations, 0 seed files, 2 sources, 0 exposures, 0 metrics, 0 groups 15:10:34 15:10:35 Concurrency: 1 threads (target=\'dev\') 15:10:35 15:10:35 1 of 1 START freshness of jaffle_shop.orders ................................... [RUN] 15:10:37 1 of 1 WARN freshness of jaffle_shop.orders .................................... [WARN in 1.72s] 15:10:37 Done. 宣言的記述について ソフトウエアパラダイムの1つとして市民権を得ている宣言的(Declarative)記述について書いてみる。 開発者人口が多いVue.jsかReact.jsで爆発的に認知され(ひと昔前に)一気に当たり前になった印象がある。 歴史的な経緯からフロントコードは非同期かつイベント駆動であって、酷い可読性だった覚えがある。 onClickの中にonClickを書いて、その中にonClickを書いて...みたいなことが起こり得た。 JSの言語仕様でasync awaitパターンがサポートされステートマシンを合理的に記述できるようになった。 「テキストボックスにバインドする変数はX」と書くだけで、関係する処理が省略できてむっちゃ楽。 個人的には構成管理ツールのAnsibleで宣言的記述の良さを実感できた気がする。 構成管理の界隈では、冪等性が重要で例えば「nginxは192.168.1.64:8080でlistenすること」 のように定義しさえすれば、何度実行しても設定が定義値であることが保証される仕組みが欲しい。 もし状態を宣言的に記述できなければ、細かい処理を自力で実装しなければならなくなる。 「listen: 192.168.1.64:8080」と書いて実行しさえすれば良いならむっちゃ楽。 ミドルウェアとデータの違いはあるが、実体があるものを抽象化し振る舞いを状態定義する様から、 dbtはAnsibleやTerraformに近い気がする。(データの構成管理をしているような...) freshnessを確認するコードをJavaで書けとか言われたらタル過ぎるので、 dbtで当たり前のように宣言的に記述できるのはありがたい。 まとめ dbt Fundamentalsの「Sources」セクションを聴いて、内容を文書化してみた。 Sourceを抽象化することでDRYを実現できること、 抽象化した先でデータ鮮度の確認ができることについて定義や実行例を追って確認してみた。

default eye-catch image.

モデルについて考えてみた話

dbt Fundamentalsの「Models」のセクションについて、理解した内容を言葉にしてみた。 かなりの部分で感想を織り交ぜてみた。この記事は前回の記事(以下)の続き。 [clink url=\"https://ikuty.com/2023/07/10/dbt-word/\"] 実践的なコンテンツというよりは、概念の理解を優先した入門用のコンテンツであって、 これでモノを作れることはないと思うし、資格対策には不足していると思う。 動画の講師の方は自己紹介で教師のバックグラウンドがあると話されている。 個人的にはUdemyの類似品より構成と英語の発音がわかりやすいと感じる。 [arst_toc tag=\"h4\"] クエリをバラして理解しやすいようにしたい 関数型言語は、関数が再起的に評価されることで最終的な応答が確定する。 手続き型言語と異なり、再帰の評価の途中で一時停止して内容を観察することが難しい。 SQLは関数型言語ではないが、内包するSQLの評価を一時停止して観察することが難しい、 という点で近い。制御がない一筆書きである点が複雑さから逃れる理由になっていそう。 ビジネスロジックが乗った巨大なクエリは確かに一筆書きではあるが、理解しづらい。 クエリをバラしたら理解しやすいんじゃね? みたいな動機があるのだと思う。 ただバラして制御するとそれはもう手続きなので、バラすけど再利用するだけに留める。 バラして、途中経過を観察したり動きを制御したり、そういう仕組みをdbtが提供する。 Modularity クエリをバラすことを、dbtでは「Modularity(モジュール性)」という用語で説明している。 動画では「車の製造」が例えとして挙げられている。パーツとパーツを繋げて車を製造すると。 Modularityは構造化されていて、上流(Upstream)から下流(Downstream)へと繋がる。 Upstreamで作ったパーツをDownstreamで再利用する、という仕組み。 1個1個のパーツは、CTE(Common Table Expression)を利用して表現される。 CTEは標準SQLに定義されていて、クエリの再利用性を高める表現手段。 要は、dbtはCTEを使ってクエリの再利用性を強制するF/Wなんだろうと思う。 CTEと論理レイヤとモデル 実際に動作する生クエリを書くのではなく、論理的な記述レイヤを設けている。 論理レイヤの記述フォーマットとして、jinjaテンプレートが採用されている。 jinjaテンプレートにマクロを記述することで論理レイヤの表現能力を獲得している様子。 Modularityの観点でCTEで各パーツを定義し、Up->Downのフローを論理的に定義する。 dbtが論理レイヤをコンパイルして物理SQLを吐くという仕様になっている。 このパーツのことをdbtでは「モデル」と呼んでいる様子。 1個のSELECT文が1個のモデルと対応する制約が与えられる。 これにより、Up側モデルをDown側モデルで再利用することと、クエリ結果の再利用を紐付けている。 Materialization パーツは個別に具体的なSQLにコンパイルされる。これには「Materialize」という用語が付いている。 Upstream側を実体化する際、その実体化の手段を選択できる。 View モデルをViewとして実体化する。実データを作らないので作るのが速いしストレージを食わない。 Viewに乗ったロジックが複雑だと所要時間が増えるので、その場合はTableの採用を考える。 CREATE VIEW AS ... が使われる。 Table モデルをTableとして実体化する。実データを作るので作るのに時間がかかるしストレージを食う。 ロジックが複雑であっても所要時間が増えない。BIから直接参照するとかならTableの採用を考える。 Up側にあるデータに追加や変更があっても反映されない。 CREATE (TRANSIENT) TABLE AS ... が使われる。 Incremental パイプラインを構築する際、「置き換え」なのか「増分」なのかは重要な観点となる。 初回生成時は通常のTableとして、2回目移行は増分だけを生成する。なので速い。 「増分」とする場合、どこが既存と増分の境界なのかを定義する必要がある。 その定義をモデルファイルに書く必要があり、若干複雑になる。 Ephemeral 中間テーブルをいちいちViewやTableに実体化し続けると、DWが再利用する可能性が低い何かで 埋め尽くされてしまうことがある。そういったテーブルはModularityの中で論理的に再利用したいが、 物理的に存在して欲しくない。それを実現できる。 Downstream側で1個か2個使うだけで、直接クエリを実行する必要がない場合に使うと良い。 モデルの例 動画では以下のクエリをdbt流に書く例が示されている。 以下は生クエリで、with句により3個のクエリを用意し4個目のクエリで結合している。 with customers as ( select id as customer_id, first_name, last_name from raw.jaffle_shop.customers ), orders as ( select id as order_id, user_id as customer_id, order_date, status from raw.jaffle_shop.orders ), customer_orders as ( select customer_id, min(order_date) as first_order_date, max(order_date) as most_recent_order_date, count(order_id) as number_of_orders from orders group by 1 ), final as ( select customers.customer_id, customers.first_name, customers.last_name, customer_orders.first_order_date, customer_orders.most_recent_order_date, coalesce(customer_orders.number_of_orders, 0) as number_of_orders from customers left join customer_orders using (customer_id) ) select * from final さて、次はdbtの論理レイヤの話。まずcustomersとordersをを別のファイルに記述する。 with customers as ( select id as customer_id, first_name, last_name from raw.jaffle_shop.customers ) select * from customers with orders as ( select id as order_id, user_id as customer_id, order_date, status from raw.jaffle_shop.orders ) select * from orders customersとordersを結合する。その際、customersとordersをref関数により参照する。 言い換えると、以下が評価される時点で、既にcustomersとordersは実体が存在する。 そのため、以下を評価する前に、customersとordersの中にあるデータを確認できる。 with customers as ( select * from {{ ref(\'stg_customers\')}} ), orders as ( select * from {{ ref(\'stg_orders\') }} ), customer_orders as ( select customer_id, min(order_date) as first_order_date, max(order_date) as most_recent_order_date, count(order_id) as number_of_orders from orders group by 1 ), final as ( select customers.customer_id, customers.first_name, customers.last_name, customer_orders.first_order_date, customer_orders.most_recent_order_date, coalesce(customer_orders.number_of_orders, 0) as number_of_orders from customers left join customer_orders using (customer_id) ) select * from final dbt runコマンドにより、全てのモデルをコンパイルする。 また --selectをつけると、、特定のモデルだけコンパイルできる。 dbtには下図のようにモデル間の依存関係をグラフィカルに表示する機能がある。 モデリングの方法論とdbtの仮定 dbtは単なるクエリの変換ツールであって、現実世界のモデルする方法論を規定するものではない。 しかし、動画ではモデルのセクションでチラッとこの話に触れられている。 Fact TableとDimensional Tableに整理してDimension毎のFactを提供しましょう、 という流儀や、Kimball,Data Vaultのような流儀など、諸々存在する。 動画では、これらをTraditionalと分類し正規化がポイントであると言っている。 コンピューティングとストレージが安くなり、ELTが可能になったおかげで、 \"正規化しない(Denormalized)\"モデリング手法が実現可能になりましたよ、と言っている。 むむ.. 相当深そうだが入り口だけ。 いったん、ELTを前提としてモデルを組み上げてください、ぐらいの話として聞き取る。 ELTを前提として、モデルをこうレイヤリングし名前を付けますよと話されている。 Source ELTであるが故に、外部のデータを無変換でDWに格納したところからスタートする。 そのレイヤを「Source」と名付ける。 Staging Sourceレイヤのモデルのクレンジングと標準化を行うレイヤを「Staging」と名づける。 StagingモデルはSourceモデルと1:1対応させる。 Intermediate Stagingモデルに対してビジネスロジックを適用して最終的なテーブルを生成する。 ビジネスロジックはダラダラと汚く大きくなることが常で、このレイヤで吸収する。 Fact 起きていること、または、既に起きたこと。履歴があるもの。 なんたらイベント、Webサイトにおけるクリック事象、投票など。 Dimension 時間の経過で発生するものではない履歴がない事象。 人、場所、コト、ユーザ、会社、製品、、顧客など。 以下は、動画で話されている実行例。 dbt-labs/jaffle_shop https://github.com/dbt-labs/jaffle_shop 一番左の緑色のモデルがSource。外部から引っ張ってきたRawデータ。 Sourceと1対1対応する形で、「stg_*」という名前のStagingモデルが存在する。 Sourceに対してクレンジング、標準化を適用したデータ。 「fact_*」は、時間経過により発生する履歴データ、そして、「dim_*」は属性データ。 Down側からUp側へ、Source->Staging->Fact/Dimension という流れになっている。 よもやま話(動画と関係なし) 例えばLaravel/EloquentやRails/ActiveRecordにはQuery Builderが付いていて、 手続き型言語によってクエリのModularityを得るアプローチには既視感があると勝手に思っている。 もちろんクエリ結果をDWに統合できないから、DWに統合できるF/Wが必要なので、 そのアプローチはSnowflakeで言えばSnowparkによって扱われるのかなと思う。 手続き型言語ではソフトウエア開発の複雑さから逃れられないことになっている。 ちなみにORMにおける物理テーブルの抽象化において「モデル」という用語が充てられている。 少なくとも1990年代後半にはORMの文脈で物理テーブルを抽象化する概念は実用化されていた気がする。 2000年代には、オブジェクト指向分析設計の文脈で「現実世界のモデル化」がイキりたおされていた。 2010年代、Laravel/Eloquent, Rails/ActiveRecordでORMのモダニズムが言われてた。 手続きに持ち込まず、SQLの世界だけでモデル化が話されるのは新しい気がする。 ここは最初の「Analytics Engineers」の話に由来している気がする。 まとめ dbt Fundamentalsの「Models」セクションを聴いて、内容を文書化してみた。 dbtはModularityと考え方に基づいてCTEによりクエリをバラすツールであることについて、 Modelの定義や実行例を追って確認してみた。

default eye-catch image.

ETLとELTの違いが説明されていたので聞いてみた話

サーバーサイドエンジニアのデータベースエンジニア寄りの枠ぐらいの認識しかないと、 データエンジニアリングの尖った部分に永遠に近づけないという感触がある。 サーバーサイドエンジニアの入口から入った場合はちゃんとチェンジしないといけないな。 dbtが公式で「データエンジニアリングの用語と概念」を長い尺で説明していて学んでみた。 サーバーサイドエンジニア的にはETLは普通のジョブやバッチ処理だと思うが、 ELT化することで何を改善しようとしていて何が達成されたのか説明できるようになった。 dbt Fundamentals https://courses.getdbt.com/courses/fundamentals [arst_toc tag=\"h4\"] ETLとELTの違い 旧来から分析基盤を作るためにデータを抽出してどこかに蓄え加工してDBに入れてきた。 これは、巨大なデータをダイレクトに入れる箱や資源がなくそれ以外の選択肢がなかったため。 これはもうサーバーサイド開発であってバッチ処理で必要なデータを揃えているのにあたる。 構成を実現するためのインフラレベルの観点、分析用途に加工するビジネスロジックの観点の2つ。 この2つをゴチャっと処理する巨大なソフトウエアを書かないといけなかった。 クラウドベースのDWが登場し、巨大なデータを入れる箱や資源が安く手に入るようになった。 そしてdbtのように「生データをうまいこと加工する」ツールが手に入るようになった。 抽出したままの生データを突っ込むのは簡単だし、ツールでうまいこと加工できるようになった。 もちろんSQLだけで加工するので出来ること出来ないことはあるが、 うまくいけば、誰も開発やメンテがしづらい巨大なソフトウエアを書かなくてもよくなった。 分析用ビジネスロジックの分離 dbt公式のdbt Fundamentalsの初っ端で、 抽出したデータをDWに投入するインフラレベルの人たちを「Data Engineers」、 生データをBIで必要なレベルまで加工する人たちを「Analytics Engineers」、 分析する人たちを「Data Analysts」と分類している。 実際、エンジニアが分析対象を理解しようとすると、キャリアの壁にぶち当たる。 エンジニアは技術力が必要だし分析者は要件定義能力が必要。 要件定義的なことができるエンジニアは圧倒的に不足しているのが世の常で、 エンジニアに対する要件から要件定義能力を分離したいというのは切実な思い。 dbt Fundamentalsで紹介されている下の表はよく出来ていると思う。 (フロントにETLを前提としたBIツールがあってTとLをUIでやらせる世界観だと 組み合わせたときにELTLになってしまう。トータルで考えないと上手くいかないと思う) Modern Data Stackとdbt dbtは「T」をやるツール。dbtの有名な図は以下。「E」と「L」は外。BIやMLは外。 ほぼSQLとymlだけで構成できるところが特徴。 SQLを使ってモデルを開発して、SQLを使ってテストして、ドキュメンテーションを構造化する。 WHがあって初めて存在するツールであって、WHの資源を使って動く。 オーケストレーションとDAGの管理が内包される。 まとめ インフラ的な観点とビジネスロジック的な観点の2つがゴチャった何かを作るのは大変という世界がある。 まぁそれでも書けよ、とも思うけど単にデータを抜いて格納するだけの人と、 要件定義してビジネスロジックを考えてデータを加工する人を分けられれば、人を集めやすい。 強力なDWが出来たことでデカいデータを生でぶち込めるようになったし、dbtみたいなツールができて ビジネスロジックに基づいて生データを加工するのが簡単になった。 トータルで複雑なソフトウエアを作らなくてもよくなるからELT美味しいよ、という話でした。