default eye-catch image.

GCSとのストレージ統合を設定した話

SnowflakeはS3, Blob, GCSを外部ステージとして設定し、データをロードする機能を備えています。 Snowflakeは各クラウドストレージとの接続方法として「ストレージ統合」の使用を推奨しています。 今回、GCSとのストレージ統合を設定し、外部ステージのロード先としてみました。 その手順等を感想を混ぜつつ書いてみようと思います。 参考にしたSnowflake公式ドキュメントは以下です。 ストレージ統合の設定方法がステップbyステップで示されています。 Google Cloud Storageの統合の構成 この記事は、公式が説明するステップに従って書いていきます。 [arst_toc tag=\"h4\"] GCSとストレージ統合 一般に、各ストレージサービスと連携する際、接続に必要な接続情報が必要です。 接続情報を自力で保管し使用する場合、セキュリティリスクに晒されるため、 Snowflakeはよりセキュアな方法として「ストレージ統合」機能を提供しています。 現在、Snowflakeは「ストレージ統合」機能のみを推奨しています。 各クラウドプロバイダ毎に実装方法は異なり、GCSの場合、Service Accountを使用します。 クラウドプロバイダ間で概念が異なるため 「AWSでいうところのXXXだよね」 は誤解を生みますが、 簡単に言えば、AWSのIAM、Azureのサービスプリンシパル、が相当すると思います。(本当??) 以下の図はその概念図です。(公式直リンク) Snowflakeは、ストレージ統合オブジェクトをGCS用のサービスアカウントと紐付けます。 このサービスアカウントは裏でSnowflakeが作成します。 言い換えると、GCSの認証責任をSnowflakeが作成するサービスアカウントに委任します。 Snowflake上のロード・アンロード操作の裏で透過的にサービスアカウントの権限参照が行われます。 Snowflakeでストレージ統合を作成する Snowflake上でストレージ統合を作る方法は以下の通りです。 実行にはACCOUNTADMINロールか、CREATE INTEGRATION権限が必要です。 ストレージ統合を作成する段階では、GCP側の権限設定は不要です。 CREATE STORAGE INTEGRATION gcs_int TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = \'GCS\' ENABLED = TRUE STORAGE_ALLOWED_LOCATIONS = (\'gcs://mybucket1/path1/\', \'gcs://mybucket2/path2/\') STORAGE_BLOCKED_LOCATIONS = (\'gcs://mybucket1/path1/sensitivedata/\', \'gcs://mybucket2/path2/sensitivedata/\'); 自動生成されたサービスアカウントの情報を取得 CREATE INTEGRATIONによって作られたSnowflake用のサービスアカウント情報を取得します。 オブジェクトの詳細情報を取得する DESC コマンドにより取得できます。 DESC STORAGE INTEGRATION gcs_int; +-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+ | property | property_type | property_value | property_default | +-----------------------------+---------------+-----------------------------------------------------------------------------+------------------| | ENABLED | Boolean | true | false | | STORAGE_ALLOWED_LOCATIONS | List | gcs://mybucket1/path1/,gcs://mybucket2/path2/ | [] | | STORAGE_BLOCKED_LOCATIONS | List | gcs://mybucket1/path1/sensitivedata/,gcs://mybucket2/path2/sensitivedata/ | [] | | STORAGE_GCP_SERVICE_ACCOUNT | String | service-account-id@project1-123456.iam.gserviceaccount.com | | +-----------------------------+---------------+-----------------------------------------------------------------------------+------------------+ STORAGE_GCP_SERVICE_ACCOUNTプロパティが、作成されたサービスアカウントです。 バケットオブジェクトにアクセスするためのサービスアカウント権限を付与する 最後に、サービスアカウントがバケットへアクセスするための権限設定を行います。 ここで初めて、GCPコンソール上での操作が必要となります。 カスタムIAMロールの作成 以下の権限を持つIAMロールを作成します。今回はロードのみを行うため以下を設定します。 パージを行う場合はobjects.delte、アンロードを行う場合はobjects.createが必要です。 storage.buckets.get storage.objects.delete storage.objects.get storage.objects.list サービスアカウントへIAMロールを割り当てる GCPのやり方に従って、サービスアカウントへIAMロールを割り当てます。 バケットに対して、サービスアカウントのアクセスを許可し、その許可内容をIAMロールで指定します。 Azureと比較するとAWSとGCPは近いなと感じます。 外部ステージの作成 まず、ステージオブジェクトを格納するSnowflakeデータベースへの権限付与を行います。 GRANT USAGE ON DATABASE mydb TO ROLE myrole; GRANT USAGE ON SCHEMA mydb.stages TO ROLE myrole; GRANT CREATE STAGE ON SCHEMA mydb.stages TO ROLE myrole; GRANT USAGE ON INTEGRATION gcs_int TO ROLE myrole; 次にストレージ統合を指定して外部ステージオブジェクトを作成します。 USE SCHEMA mydb.stages; CREATE STAGE my_gcs_stage URL = \'gcs://mybucket1\' STORAGE_INTEGRATION = gcs_int FILE_FORMAT = my_csv_format; 最後に疎通試験を行います。 copy into testtable from @my_gcs_stage pattern=\'testdata.csv\'; まとめ Snowflakeの公式ドキュメントの通りにGCSとのストレージ統合を作成しました。 また、作成したストレージ統合上に外部ステージを設定し、ロードが出来ることを確認しました。

default eye-catch image.

デプロイメントについて調べてみた話(端折り気味)

dbt Fundamentalsの「Deployment」のセクションについて理解した内容を言葉にしてみた。 自動化を解決するのにdbt Cloudを使用した記事となっている。この記事は前回の続き。 [clink url=\"https://ikuty.com/2023/08/23/dbt_docs/\"] いったんdbtそのものの理解に留めたいので、かなり端折気味。 [arst_toc tag=\"h4\"] デプロイメントとは 以下、動画の意訳。 開発環境であればIDE上でコマンド(dbt run,dbt test)をAd-Hocに必要に応じて実行してきた。 本番環境へのデプロイメントは異なる。本番環境用に分離されたブランチ(main,master?)が存在し、 そして分離されたスキーマが存在する。(開発環境と異なり)意思決定に必要なデータが存在する。 また、本番環境では(Ad-Hocではなく)dbtコマンドをスケジュール実行する。 ビジネス要件に応じてdbt run, dbt testを実行することになる。 本番環境は、分離されたブランチ・スキーマ上においてSSOT(Single Source Of Truth)を実現する。 そして、これまで作業してきた開発環境を分離し、様々な変更を本番環境への影響なしに実行する。 Snowflakeの本番検証環境分離の戦略 これ、動画にないのだけれど、そもそもSnowflake側を検証環境と本番環境に分離しないといけない。 以前、以下の記事にマルチテナントに関係するホワイトペーパーを読んで内容を整理してみたが、 そこで、環境を分離するにあたって必要な考慮事項がまとめられている。 [clink url=\"https://ikuty.com/2023/03/29/snow-multitenant-design/\"] 概ね、テナント=環境と捉えて問題ないという感想を持っている。 OPT(Object Per Tenant)、つまり、同一アカウント内に検証/本番環境を共存させるスタイルと、 APT(Account Per Tenant)、つまり、アカウント毎に検証/本番環境を分離するスタイル、 両方あると考えている。 Snowflakeにおいて、アカウントを跨いでオブジェクトを共有できないことがポイントで、 それを不自由な制限事項と取るか、確実な安全の確保と取るか、どちらかの選択となる。 OPTであれAPTであれ、オブジェクト数の増加に伴いTerraform等のツールが事実上必須となる ことから、実はAPTでも事実上の共有をしているのと同じとなり、APTの方が良いのかなと思っている。 URLが分離するので運用の調整がしやすいし、第一に安全性にグレーな部分が無くなる。 dbt Cloud上でのデプロイメントの準備 dbt Coreの各コマンドをスケジュール実行することで機能を実現する、という特徴から、 そもそも検証/本番環境を実現するためにはdbt Coreだけでは機能が足りない。 dbt Cloudには、検証/本番環境の実現に必要な機能が乗っている。 なので、このDeploymentセクションはdbt Cloudの機能説明に近い。 ちょっと記事の趣旨から外れてしまうのでサラッと眺めるに留める。 dbt Cloud上のDeploymentメニュー配下には以下のメニューがある。 - Run History - Jobs - Environments - Data Sources Environmentsから、Deployment単位に対する操作が行える。 Create Environmentの操作により、新たなEnvironmentを作成できる。 - General Settings - Name - Type - dbt Version - [Check] Only run on cuustom branch データプラットフォーム(DW)との接続に必要な設定を行う。 DW側がアカウント分離方式で環境分離しているのであれば、ここで本番検証の差異が発生する。 - Deployment Connection - (Overwrite connection settings for this environment, only certain fields are able to be overwrite ) - Account - Role (Optional) - Database - Warehouse 接続に必要なCredentialsの設定を行う。環境分離されている粒度で接続を行う。 - Deployment Credentials - (Enter your deployment credentials here, dbt will use these credentials to connect to your database and run scheduled jobs in this environment ) - Auth method - Username - Password - Schema ジョブ実行時の設定を行う。タイムアウト設定や、都度ドキュメントを作るか、都度source freshnesを判定するか、 また、そもそも何のコマンドを実行するか。 - Execution Settings - Run Timeout - (Number of seconds a run will execute before it is canceled by dbt Cloud. Set to 0 riever time out for this job) - Defer to a previous run state ? - Generate docs on run - (Automatically generate updated project docs each time this job runs) - Run source freshness - (Enables dbt source freshness as the first step on this job, without breaking subsequent jobs) - Commands - (This is where you can pass whatever dbt commands you would like. So this could be dbt run, dbt tests, dbt seed, whatever it might be) 省略... - Helpful Resouruces - Enabling CI - Souurce Freshness dbtがどういったトリガでジョブを起動するかを設定する。 結構いろいろなことができる。スケジュール実行だけでなくWebhook、APIなんかも設定できる。 細かいところは省略。 - Triggers - Configure when and how dbt should trigger this job. - Schedulue - [Check] Run on Schedule - [Radio] Schedule Days - Subday,Monday,Tuesday,Wednesday,Thursday,Friday,Saturday - [Radio] Enter custom schedule - Timing - Every [??] hours (Starting at midnight UTC) - At exact intervals [??] UTC (e.g.\"0,12,23\" for midnight,noon,and 11PM UTC) - Webhooks - [Check] Run on Pull Requests ? - (省略) - API (省略) デプロイメント まとめ Historiesから、Jobの実行の実行履歴を観察できる。 以下、上からGit RepositoryからCloneした後、Snowflakeとの接続諸々を実行、 dbt deps (dbt Fundamentalsで出てきてないが、モジュール化されたパッケージのインストール)を実行。 まとめ dbt Fundamentalsの動画を聴いて、dbtのデプロイメント機能を追ってみた。 dbtそのものを追いたいので大分端折ってしまった。あまり記事に意味がないかもしれない。 自分の経験が追いついてきたらもう一度トライしてみたい。

default eye-catch image.

dbtのドキュメント生成機能について調べてみた話

The dbt Viewpointにおいて、dbtはソフトウエア開発のシナリオで解決されてきた諸々を考慮した F/Wであることが説明されている。その中で、バージョン管理、継続的なテストによる品質保証、そして、 ドキュメンテーションの重要性が改めて説明されている。dbtの対象はアプリケーション開発と地続き。 The dbt Viewpointについて以下の記事で書いてみた。 [clink url=\"https://ikuty.com/2023/08/23/dbt-viewpoint/\"] dbt Fundamentalsの「Documentations」セクションを読んで理解した内容を書いてみる。 [arst_toc tag=\"h4\"] なぜドキュメンテーションが重要なのか? dbtが想定するAnalytics Engineeringの新ワークフローで色々解決しようとしている。 dbtが備えるドキュメンテーション機能が存在する理由について言及されている。 ちょっとどうかな..と思うことが多かったが言及されている内容をまとめてみる。 ユーザによる自己解決 ソフトウエア開発のシナリオでは、ドキュメントは主に開発者間のコラボレーションが目的だが、 Analytics Engineeringの世界ではデータの作成者と消費者の間のコラボレーションが目的。 データの作成者と消費者がデータの仕様を共有することは実際の運用で最も重要だと思う。 例えば「どうやって計算したのか」、「どこから持ってきたのか」といった各自の疑問を自己解決して もらいたい、というモチベがあると言及している。 成果物とドキュメントの統合 ソフトウエアの成果物とドキュメントが分離すると管理が面倒になる。 ソフトウエアのアップデートにドキュメントが追いつかず嘘八百となることは良く観察される。 dbtは、ドキュメントを成果物の一部とすることでその問題を解決することを言及している。 記述の簡便さ YAMLで書けるので新しい文法の理解は不要としている。 ただしYAMLは厳密なので、それはそれで定義を理解する必要はあると思う。 既存のjavadoc的何かとの比較ではなく、何も無いところとの比較ではその通り。 dbtのドキュメンテーション では、dbtでそれをどうやるのか。 DAGの観察 dbtには、Sourceから最後のモデルまでの流れを自動的に可視化する機能がある。 全てのmodelの依存関係をSoruceから順番に示してくれる。 特に意識せずとも、model内のref関数が依存関係を示す情報として使われる。 動画では\"lineage\"ではなく\"DAG\"と言われている。 Model内のドキュメント dbtには、modelファイルに特定のテキストを記述することでモデルに情報を付与する機能がある。 どこか別のリポジトリではなく、modelファイルに直接書けることが良いのだよ、と言われている。 modelを書いた後、一呼吸おいて別のところで書くのではなく、modelと一緒に書くのだよとか。 これどうだろうな...。急いでたらSQLだけ書いてコメント直さないなんてありそうだが。 Modelのドキュメント 直書きとdoc blocks STAGING層にある2つのモデルが例として挙げられている。 model定義の並び、column定義の並びにそれぞれdescriptionを直書きスタイルで配置している。 version: 2 models: - name: stg_customers description: Staged customer data from our jaffle shop app. columns: - name: customer_id description: The primary key for customers. tests: - unique - not_null - name: stg_orders description: Staged order data from our jaffle shop app. columns: - name: order_id description: Primary key for orders. tests: - unique - not_null - name: status description: \'{{ doc(\"order_status\") }}\' tests: - accepted_values: values: - completed - shipped - returned - placed - return_pending - name: customer_id description: Foreign key to stg_customers.customer_id. tests: - relationships: to: ref(\'stg_customers\') field: customer_id もう1つの方法として、docブロック( {{ doc() }} )を使う方法が説明されている。 上のstatusのdescriptionようにdoc()の中に識別子を書いて、 別ファイル(.md)の中でその識別子の詳細を書く。要は、長いテキストを定義の外に書くスタイル。 (さっき、同じファイルに書くところが良いって言ってたばかりなのに...) {% docs order_status %} One of the following values: | status | definition | |----------------|--------------------------------------------------| | placed | Order placed, not yet shipped | | shipped | Order has been shipped, not yet been delivered | | completed | Order has been received by customers | | return pending | Customer indicated they want to return this item | | returned | Item has been returned | {% enddocs %} Sourceのドキュメント modelの場合と同様に、source定義,table定義,column定義にdescriptionを配置している。 version: 2 sources: - name: jaffle_shop description: A clone of a Postgres application database. database: raw schema: jaffle_shop tables: - name: customers description: Raw customers data. columns: - name: id description: Primary key for customers. tests: - unique - not_null - name: orders description: Raw orders data. columns: - name: id description: Primary key for orders. tests: - unique - not_null loaded_at_field: _etl_loaded_at freshness: warn_after: {count: 12, period: hour} error_after: {count: 24, period: hour} ドキュメントの生成と閲覧 動画ではdbt Cloudの例が説明されていて、IDE上でシームレスにドキュメントが表示されている。 説明されているもので全部ではなさそうで、きっとFundamentals用に概要が話されている。 動画で説明されているドキュメントの項目は以下の通り。 - dbt docs generate でドキュメントを生成する - ymlファイルとその他のファイルに付加した情報がドキュメントに出力される - (dbt Cloudでは) View Docsボタンの押下により生成されたドキュメントを閲覧できる - 左ペインにプロジェクト内のファイル階層が表示される。(ModelやSourceが表示される) - 左ペインからModel(yml)を選択すると、そのymlと対応するドキュメントが表示される - Detailとして、TAGS,OWNER,TYPE,PACKAGE,LANGUAGE,RELATION。 - DESCRIPTIONとして、ymlに付加したdescription。 - descriptionがdoc blocksの場合、押下で展開される。 - Columnsとして、列名,型,description,TESTの有無,追加の情報 - Referenced Byとして、どのモデルから参照されているか(つまり下流にあるモデル) - またはどのテストからそのモデルが参照されているか - Depends Onとして、どのモデルに依存しているか(つまり上流にあるモデル) - Codeとして、ymlファイルの中身、と、コンパイル済みコード - yml内のjinjaテンプレートがコンパイル済みコードで展開されている。便利。 - model選択中に右下の View Lineage Graphボタン押下でmodelを含むリネージが表示される - プロジェクトレベルで View Lineage Graphボタン押下で全体のリネージが表示される まとめ dbt Fundamentalsの動画を聴いて、dbtのドキュメント自動生成機能を追ってみた。 Model、SourceにDescriptionを付与し、自動生成ドキュメントに出力されることを確認した。 dbt Cloudでしか試していない。dbt Coreで出すとファイルが吐かれるのだろうか。 ちょっと流石にこれだけだと機能が足りない気がする。 きっと他で補完は必要だと思う。読めてないだけで本体に何かあるのかもだけれども いったんFundamentalsの読みは終了。

default eye-catch image.

The dbt Viewpointを読んでみた話

古今東西、フレームワークは過去の不合理や不便さに対する解決策として生まれていると思う。 特に、アプリケーション開発のシナリオで圧倒的な生産性の向上を生み出す何かが生まれたりする。 それは過去があまりにも酷かったのもあるし、リアーキテクチャのセンスが天才的なものもあると思う。 dbtは過去の分析プロジェクトで得られたいくつかの論点を解決するように作られている。 dbtは分析における様々なタスク(以下、ワークフロー)を効率よく解決するためのツールである。 分析用ではあるが、モダンなアプリケーション開発フレームワークが解決した多くの論点に基づいて 開発されている。その由来の詳細について「The dbt Viewpoint」で説明されているので読んでみた。 [clink implicit=\"false\" url=\"https://docs.getdbt.com/community/resources/viewpoint\" imgurl=\"https://d33wubrfki0l68.cloudfront.net/38a468b6bb7aeae799ea43bb9510af0753b90d91/42e9b/img/dbt-logo.svg\" title=\"The dbt Viewpoint\" excerpt=\"In 2015-2016, a team of folks at RJMetrics had the opportunity to observe, and participate in, a significant evolution of the analytics ecosystem. The seeds of dbt were conceived in this environment, and the viewpoint below was written to reflect what we had learned and how we believed the world should be different. dbt is our attempt to address the workflow challenges we observed, and as such, this viewpoint is the most foundational statement of the dbt project\'s goals.\"] 訳と文章作成4時間以内のスピードチャレンジなので間違っていたらすみません。 原文は平易かつ丁寧な表現で面白いので読んでみてください。 [arst_toc tag=\"h4\"] dbtの由来 2015-16年, RJMetrics社から派生した分析プロジェクトは、分析のエコシステムにおける重要な発展 を観察し参加する機会を得た。dbtの種となる考え方はこの環境で考案された。 The dbt Viewpoint(この記事)に書かれている論点は、チームが学んだこと、 そして他がどう変わるべきと信じているか、について反映させるために書かれている。 dbtは、私たちが観察したワークフローの課題に対処するための私たちの試みであり、 (この資料に書いた)論点は、dbtプロジェクトのゴールを示す最も基本的な記述である。 今日のAnalytics 真の成熟した分析組織においては、プロプライエタリソフトウエアが過去のものとなり、 データ統合を行うコードとツール、ハイパフォーマンス分析データベース、SQL/R/Python、 可視化ツール、を組み合わせてソリューションを組む流れに進んでいる。 この変化により、重要な可能性を見出せるようになったが、分析チームは依然として、 引き続き、高品質、低遅延に対する高い目標に対峙しないといけない。 我々は分析チームはワークフローに関する問題を抱えていると信じている。 分析者はしばしば離れた環境で操作を行い、結果として部分的な最適化を生み出している。 結果、ナレッジはサイロ化されてしまう。同僚が既にどこかで書いたコードを再生産したりしている。 あまり馴染みのないデータのニュアンスを掴めない。 洗練されたチームであっても解決できないことを何百回も聴き、概して現状維持だと認識している。 組織の決定のスピードと品質は上がらない。 分析はこのようである必要はない(?)。実際、これらの問題についてソフトウエア開発チームが 既に解決している。ソフトウエアエンジニアが早く高品質で成果物を生産するために行なっている 多人数共同開発と同じテクニックを分析にも適用できる。 Analyticsは共同作業的 成熟した分析チームが持つべきテクニックやワークフローはソフトウエア開発に見られる以下の 共同作業的な特徴を持つべき。 バージョンコントロール 分析を行うコードは、それがPythonなのか、SQLなのか、Javaなのかそれ以外なのかによらず、 バージョンコントロールされるべきだ。分析はデータとビジネスが変化していく中で、 誰が何をいつ変更したのかを知ることが重要になる。 品質保証 悪いデータは悪い分析につながる。そして悪い分析は悪い決定につながる。 データや分析結果を作り出すコードはレビューされ、テストされるべきだ。 Bad data can lead to bad analyses, and bad analyses can lead to bad decisions. Any code that generates data or analysis should be reviewed and tested. ドキュメンテーション あなたが書いた分析はソフトウエアアプリケーションである。他のソフトウエアアプリケーションと同様に 他の誰かが「それをどのように使うのか」疑問を抱く。これは簡単に見えるかもしれないが、 (例えば?)\"収益線\"に様々な意味がある可能性があるように、そう簡単ではない。 コードには、それがどのように解釈されるべきか、基本的な記述を付与すべきだ。 また、疑問が生まれる度に、チームメンバが既存のドキュメントに追加するべきだ。 モジュール性(Modularity) もしあなたが、あなたやあなたの同僚が会社の収益について一連の分析を行うのであれば、 あなたがたは同じ入力データを使うべきだ。「コピペ」は良い方法ではない。 背後にあるデータが変化したとして、そのデータが使われる全ての箇所を変更しないといけない。 代わりに、パブリックなインターフェースとしてデータのスキーマを考慮すべきだ。 一貫性のあるスキーマを公開して、ビジネスロジックが変更されたときに一緒に変更できるような テーブル、ビュー、や他のデータセットを作ること。 Analyticsのコードはアセットである その分析を得るために要したコード、プロセス、ツールは組織が行った中心的な投資である。 成熟した分析組織のワークフローは次に示す特徴を備えるべきで、それにより組織の投資を 守って育てるべきだ。(so as to を意訳した) 環境 Analyticsは複数の環境を必要とする。分析者はユーザに影響を与えずに作業する自由を必要とするが、 一方で、ユーザは彼らの普段の仕事を行うため、依存するデータを信頼できるようにサービスレベルの 保証を必要とする。 サービスレベルの保証 分析チームは本番環境に投入された全ての分析の精度に責任をもつ必要がある。(stand behind.) エラーは本番環境におけるバグと同じレベルの緊急性をもって扱われるべきだ。 本番環境から不要になった全てのコードは非推奨となるプロセスで扱われるべきだ。 保守性を考慮した設計 ソフトウエア開発における大半のコストは保守フェーズに発生する。 このため、ソフトウエアエンジニアは保守性の観点を持ってコードを記述する。 しかしながら、分析コードはしばしば(ソフトウエア開発のものより)扱いづらいものである。 背後にあるデータの変化により、予測し修正することが難しい形で大半の分析コードは壊れてしまう。 分析コードは保守性の観点をもって書かれるべきだ。スキーマとデータに対する将来の変化を予測し、 対応する影響を最小限に抑えるためにコードを書くべきだ。 分析ワークフローは自動化ツールを必要としている しばしば、多くの分析のためのワークフローは手作業で行われる。 分析者は、sourceからdestinationへ、stageからstageへの移送手段構築にほとんどの時間を使う。 ソフトウエアエンジニアは彼らの仕事で起こる手作業の部分の広範囲をカバーするツールを構築しているう。 私たちが推測している分析用途のワークフローを実装するために、似たようなツールが必要となるだろう。 以下は自動化されたワークフローの一例。 このようなワークフローは単一のコマンドで実行できるように構築されるべき。 modelと分析は複数のソース管理リポジトリからダウンロードされる コードは与えられた環境に適合して構成される コードはテストされる、そして コードはデプロイされる まとめ The dbt Viewpointを読んで訳してみた。 アプリフレームワークで開発経験があると、かなり似た雰囲気を感じるツールなのだが、 どうやら本当にアプリ開発の分野にインスパイアされたツールだったということがわかった。

default eye-catch image.

テストとDAGの構築について考えてみた話

dbt Fundamentalsの「Tests」のセクションについて理解した内容を言葉にしてみた。 Models,Sourcesの各セクションと比較して1行が含む意味が増えている印象がある。 基礎的な内容をインプットするには結構なスローペースだとは思うが抑えていこうと思う。 この記事は前回の記事(以下)の続き。 [clink url=\"https://ikuty.com/2023/07/23/dbt_sources/\"] これだけで「データ品質」をどうこうするには足りないと思う。 データ品質に関わる何かについてはどこかで入門して記事化しようと思います。 最後の2節は「それってあなたの感想ですよね」と言って読み飛ばしてください。 [arst_toc tag=\"h4\"] Generic Tests Generic Testsは、Assertionの定義をyaml形式で記述するタイプのテスト。 dbtには4つのGeneric Testsがビルトインで定義されていて、特別なインストール不要で利用できる。 Generic Testsに関係する公式のリファレンスページは以下。 About tests property Assertの定義を書いたyaml形式のファイルをmode-pathに配置し、dbt testコマンドで実行する。 not_null 指定したテーブル・カラムの全ての値がNULLでないことをAssertする。 version: 2 models: - name: orders columns: - name: order_id tests: - not_null unique 指定したテーブル・カラムがユニークであることをAssertする。 OPTIONALな属性であるconfigにより、Assertの範囲を追加することができる。 ersion: 2 models: - name: orders columns: - name: order_id tests: - unique: config: where: \"order_id > 21\" accepted_values 指定したテーブル・カラムにvaluesで指定した値のみが存在することをAssertする。 OPTIONALな属性であるquoteにより、values内に整数値、ブール値などを配置できる。 まぁ\"false\"とfalseは違うし、\"100\"と100は違うので、そういう配慮。 version: 2 models: - name: orders columns: - name: status tests: - accepted_values: values: [\'placed\', \'shipped\', \'completed\', \'returned\'] - name: status_id tests: - accepted_values: values: [1, 2, 3, 4] quote: false relationships 参照整合性制約、つまり外部キーが参照するレコードが相手先テーブルに存在することをAssertする。 公式には、childテーブルとparentテーブルという表現が出てくる。 This test validates that all of the records in a child table have a corresponding record in a parent table. This property is referred to as \"referential integrity\". これは、yaml上の上位階層と下位階層の関係を言っていて、以下であればordersのcustomer_idと customersのidの間で参照整合性が成立するかをAssertする。 toの先として、ref関数、またはsource関数を指定する。 version: 2 models: - name: orders columns: - name: customer_id tests: - relationships: to: ref(\'customers\') field: id dbt test dbt testコマンドによりテストを開始する。 Test an expression 上記までは特定のカラムに対するAssertionを定義するものだった。 もし複数カラムを使用したAssertionを定義しようとすると、columns:以下に書けない。 この場合、models:直下にcolumns:ではなくtests:を配置することができる。 以下は(ちょっと不思議)、ordersテーブルにある「country_code」と「order_id」をハイフンで繋いだ 文字列をカラム名として持つカラムがユニークであることをAssertする。 例はアレだけど、式の評価(expression)結果をAssert定義に使用できる、といったところ。 version: 2 models: - name: orders tests: - unique: column_name: \"country_code || \'-\' || order_id\" Use custom generic test 自力で実装したテストをビルトインテストの代わりに使用することができる。 この記事では詳細は省略。 version: 2 models: - name: orders columns: - name: order_id tests: - primary_key # name of my custom generic test Singular Tests Assertをyml形式で記述するスタイルの他に、単体のSELECT文形式で定義するスタイルもある。 SELECT文を1個書いたファイルをtest-pathに配置する。 SELECT文が1件以上のレコードを返す場合、そのテストは失敗、という扱いになる。 dbt Fundamentalsの動画では1例だけサラッと説明されている。 -- Refunds have a negative amount, so the total amount should always be >= 0. -- Therefore return records where this isn\'t true to make the test fail. select order_id, sum(amount) as total_amount from {{ ref(\'stg_payments\') }} group by 1 having not(total_amount >= 0) Source Tests ModelだけでなくSourceに対してもテストを書ける。(テストを書ける対象は他にもある) dbt Fundamentalsの動画に出てくる例では、前節で説明されていたSourceにテストを追加している。 version: 2 sources: - name: jaffle_shop database: raw schema: jaffle_shop tables: - name: customers columns: - name: id tests: - unique - not_null - name: orders columns: - name: id tests: - unique - not_null loaded_at_field: _etl_loaded_at freshness: warn_after: {count: 12, period: hour} error_after: {count: 24, period: hour} testを実行するコマンド Generic tests、Singular testsを実行するコマンドは以下の通り。 dbt Fundamentalsには一部しか載っていない。(★を付けた) # ★ run all generic and singular tests in your project. $ dbt test # ★ run only tests defined singularly $ dbt test --select test_type:singular # ★ run only tests defined generically $ dbt test --select test_type:generic # ★ run source test $ dbt test --select source:test_name # run tests for one_specific_model $ dbt test --select one_specific_model # run tests for all models in package $ dbt test --select some_package.* # run singular tests limited to one_specific_model $ dbt test --select one_specific_model,test_type:singular # run generic tests limited to one_specific_model $ dbt test --select one_specific_model,test_type:generic dbt buildとDAGの仕組み 先に作ったモデルを使って後のモデルを作る、という仕組みを考えようとすると、 先の実行が上手くいかなかった場合に、後の処理を止めたいという欲求は必ず発生する。 dbt buildコマンドにより、上流側でrunとtestを行い、その結果に基づいて 下流側のrunとtestを実行する、という仕組みを作ることができる。 # hogeモデルとその上流のDAGを構築して実行 $ dbt build +hoge これ自力で作ったら大変だし、本当にその通り動くのか確認するのも大変。 たぶんこれがdbtを使う大きなメリットなんだろうな。 dbt FnudamentalsのTestsセクション内のThe dbt Build commandというパートで、 実例を使ってDAGが最後まで通るパターン、途中でテストがNGになって止まるパターンの両方が すごくライトな感じで説明されていてイメージ掴むにはとても良かった。 公式により詳細な記述があるので追ってみる。 About dbt build command (感想1)ソフトウエア開発と品質保証 昔、とあるメーカーで医療機器・システムの設計・実装・保守を長いことやっていたことがある。 医療機器が医療機器であるためには、様々な法令に基づいた品質保証を行う必要がある。 おそらく設計・開発よりも品質保証と保守のウェイトが圧倒的に高かっただろう記憶がある。 患者、医療従事者ほか、品質が保証されていないとヤバい人たちのために品質を保証する。 それはほぼ自分達のためであり、こんな具合にエコシステム全体で品質保証を言ってた。 言いたいのは、綺麗事でテストしていたのではなく明日の飯のためにテストしていた。 dbt Fundamentalsでは「データは信頼に値する品質である必要がある」みたいに書かれている。 データ分析はデータに基づいてアナリストが分析した結果で意思決定する人がいる。 嘘八百並んだ虚構のパイプラインを構築してデータ分析とか、まぁないよねと思う。 というか綺麗事抜きに持続的にお仕事もらえないよね、と思う。 (感想2)品質保証の品質は誰が保証するのか dbt FundamentalsにはAnalytics Engineering版の自動テストをやりましょう、 という世界観が書かれている。 個人的には、品質保証がコアパートであるプロダクトとOSSの相性はあまり良くないと感じている。 それは、品質保証の品質を保証するコストが高すぎるから、なのだろうなと勝手に思っている。 dbtでは、Analytics EngineerはData EngineerとAnalystの間に位置付けられ、 もしこれがビジネスに寄ったエンジニアなのであれば、どこまで寄ったとしても埋まらない溝はある。 分析要件を熟知したアナリストがdbtを使うのであればワンチャン行ける気がするが大変だろう。 ymlで書いたTestが保証する範囲をエンジニアとアナリストが連携できるのか。 Analytics Engineerって、本当に実在できるのだろうか。むずくね?、と思う。 結果が致命的になるドメインとか、あまりにほ品質保証のコストが高いドメインとか、 おそらく、プロの品質保証家がいるはずだろうから、彼らの指示に従うべきで、 そうではない「普通の」品質保証レベルを効率的に達成するためのCI/CDだと思う。 まとめ dbt Fundamentalsの「Tests」セクションを聴き、公式の情報で補足しながら内容をまとめた。 4つのビルトインテストの仕様を理解した。dbtにおけるDAGの構築の仕組みについて公式を読み、 理解した内容をまとめてみた。 大きなトピックである「データ品質」について、どこかで入門したいと思う。

default eye-catch image.

ソースについて考えてみた話

dbt Fundamentalsの「Sources」のセクションについて理解した内容を言葉にしてみた。 かなりの部分で感想を織り交ぜてみた。この記事は前回の続き。 [clink url=\"https://ikuty.com/2023/07/15/dbt_models/\"] dbtはModularityのコンセプトに基づいてクエリをバラし上流から下流へと続くデータフローを 構築するフレームワーク。上流から下流までどのようにバラしても自由ではあるが、 何度か似たような仕組みを作っていくと、共通して見られる特徴に気づくことがある。 例えば、データを取り込んで、加工して綺麗にして、中間処理をして、BIやMLに渡す、といったように、 機能単位で処理をレイヤ化しておくと見通しが良くなることに気づく。 dbtはそのバラし方についてある程度規定していて以下のようにレイヤリングすることが示されている。 Source Staging Intermediate Fact Dimension この記事はSourceレイヤについての記事。 [arst_toc tag=\"h4\"] 外部インターフェースとDRY原則 データ分析基盤の構築に限らず、システムを設計する際に外部データとのインターフェースを 独立して検討が必要な設計項目とすることが多い気がしている。 それは何故かというと「自分たちの管理外にあるものであって変わり続けるもの」だからだと思う。 インターフェース仕様を定義し変化を制御しようとする。制御下にある変化を受け入れる必要がある。 変化する対象をハードコードした場合、変化が起きたときに全てを変更する必要がある。 もし対象を1箇所に記述しそれを論理的に参照する仕組みがあれば、変化に対する変更は1回で済む。 実際には、想定する粒度や概念の範囲を逸脱した場合は書き直しが必要だろうと思うので、 それは見通しが甘かったことに対する罰として受け入れないといけない。 インターフェース仕様の想定の甘さはシステム内部のそれと比べて影響が大きい。 モダンな言語やフレームワークでは、同じ変更を何度も行わせるようなタルいことをさせないように 作られている。これはDRY(Don\'t Repeat Yourself)原則と言い一般的な設計論として話される。 dbtはELTを前提としていて、外部データはそのままテーブルにロードすべし、としている。 dbtにとって、外部データをロードした一番最初のテーブルが「生」であり変化し得るが、 DRY原則に従って、1箇所で定義し抽象化したものを使いまわせるようになっている。 Sourceの定義と利用 さて、dbtのSourceによってどのようにDRYが実現できるか見てみる。まずは生クエリ。 CTEにより記述され、1個目のsourceを2個目のstagedで使用している。 ikuty_raw.jaffle_shop.customers が生データを格納したテーブル。 ハードコードされた状態。 with source as ( select * from ikuty_raw.jaffle_shop.customers ), staged as ( select id as customer_id, first_name, last_name from source ) select * from staged ; 次にdbt。model-pathにsource.ymlというファイルを配置する。 生クエリのsourceがYMLで定義されている。jaffle_shopという名前のSourceを指定している。 スキーマやテーブルが変更されたとしても、このファイルを変更すれば良い。 version: 2 sources: - name: jaffle_shop database: ikuty_raw schema: jaffle_shop tables: - name: customers - name: orders 生クエリをモデル上で以下のように書き直す。 この際、上で定義したjaffle_shop Sourceを {{source()}} により参照している。 select id as customer_id, first_name, last_name from {{ source(\'jaffle_shop\', \'customers\') }} 古いデータに基づく分析から得られた意思決定はゴミ? データ分析界隈では「データの新しさ」がしばしば重要なトピックとなる。 しかし、それはなぜなのか上手い説明を聞いたことがなかった。 dbt公式が用意するドキュメントの中に、dbtが生まれた経緯が説明されているものがあり、 その中で、風が吹けば桶屋が儲かる的なノリでこう書いてある。 explicitにデータ鮮度の重要さを示すものではないが、 「古いデータに基づく分析から得られた意思決定はゴミ」ぐらいの気持ちになった。 The dbt Viewpoint https://docs.getdbt.com/community/resources/viewpoint Quality Assurance Bad data can lead to bad analyses, and bad analyses can lead to bad decisions. データ鮮度の定義と実行 データ分析基盤には、データを定期的に取り込む類のタスクがある。 dbtは取り込みの度に取り込んだデータの鮮度を確認できる機能を備えている。 「定期的」とは、serviceやdaemon的な何かが動いてデータソースを見にいく訳ではなく、 dbtを実行して取り込む度に毎度値を参照するという意味。 物理的には、「新鮮であると見做すことができるレコードと現在との間の許容可能な時間」 を定義し、許容できない時間差を検知することでデータが古いのか新しいのかを判断させる。 この辺り、Declarativeであり、新しい何か的な印象を持つ。 sourceの定義ファイルにfreshnessブロックを定義する。 公式による仕様の説明は以下の通り。 version: 2 sources: - name: freshness: warn_after: count: period: minute | hour | day error_after: count: period: minute | hour | day filter: loaded_at_field: tables: - name: freshness: warn_after: count: period: minute | hour | day error_after: count: period: minute | hour | day filter: loaded_at_field: ... loaded_at_fieldにデータ鮮度の判定に利用するカラムを指定する。 loaded_at_fieldと現在時刻の差がcount、periodに定義した時間を超えていた場合にレポートする。 レポートには警告とエラーの2種類が存在し、warn_afterに警告、error_afterにエラーの場合を書く。 count,periodの単語選びのセンスがちょっと分からない。periodが単位、countが数値である。 例えばcount=2、period=dayなら「2日」。Declarativeに読めば良いのか? freshnessブロックは継承関係を持たせることができる。 つまりsources直下に書いたものでデフォルトを定義し、tables配下に書いたもので上書きできる。 loaded_at_fieldカラムはtimestamp型かつUTCである必要があり、変換例が公式に載っている。 # If using a date field, you may have to cast it to a timestamp: loaded_at_field: \"completed_date::timestamp\" # Or, depending on your SQL variant: loaded_at_field: \"CAST(completed_date AS TIMESTAMP)\" # If using a non-UTC timestamp, cast it to UTC first: loaded_at_field: \"convert_timezone(\'UTC\', \'Australia/Sydney\', created_at_local)\" ここでは以下のような定義とする。 version: 2 sources: - name: jaffle_shop database: ikuty_raw schema: jaffle_shop tables: - name: customers - name: orders loaded_at_field: _etl_loaded_at freshness: warn_after: {count: 12, period: hour} error_after: {count: 24, period: hour} dbt source freshnessコマンドによりデータ鮮度が評価される。 _etl_loaded_atの最大値が現在よりも12時間以上前だったのでWARNが出た。 $ dbt source freshness 15:10:34 Running with dbt=1.5.0 15:10:34 Found 1 model, 0 tests, 0 snapshots, 0 analyses, 321 macros, 0 operations, 0 seed files, 2 sources, 0 exposures, 0 metrics, 0 groups 15:10:34 15:10:35 Concurrency: 1 threads (target=\'dev\') 15:10:35 15:10:35 1 of 1 START freshness of jaffle_shop.orders ................................... [RUN] 15:10:37 1 of 1 WARN freshness of jaffle_shop.orders .................................... [WARN in 1.72s] 15:10:37 Done. 宣言的記述について ソフトウエアパラダイムの1つとして市民権を得ている宣言的(Declarative)記述について書いてみる。 開発者人口が多いVue.jsかReact.jsで爆発的に認知され(ひと昔前に)一気に当たり前になった印象がある。 歴史的な経緯からフロントコードは非同期かつイベント駆動であって、酷い可読性だった覚えがある。 onClickの中にonClickを書いて、その中にonClickを書いて...みたいなことが起こり得た。 JSの言語仕様でasync awaitパターンがサポートされステートマシンを合理的に記述できるようになった。 「テキストボックスにバインドする変数はX」と書くだけで、関係する処理が省略できてむっちゃ楽。 個人的には構成管理ツールのAnsibleで宣言的記述の良さを実感できた気がする。 構成管理の界隈では、冪等性が重要で例えば「nginxは192.168.1.64:8080でlistenすること」 のように定義しさえすれば、何度実行しても設定が定義値であることが保証される仕組みが欲しい。 もし状態を宣言的に記述できなければ、細かい処理を自力で実装しなければならなくなる。 「listen: 192.168.1.64:8080」と書いて実行しさえすれば良いならむっちゃ楽。 ミドルウェアとデータの違いはあるが、実体があるものを抽象化し振る舞いを状態定義する様から、 dbtはAnsibleやTerraformに近い気がする。(データの構成管理をしているような...) freshnessを確認するコードをJavaで書けとか言われたらタル過ぎるので、 dbtで当たり前のように宣言的に記述できるのはありがたい。 まとめ dbt Fundamentalsの「Sources」セクションを聴いて、内容を文書化してみた。 Sourceを抽象化することでDRYを実現できること、 抽象化した先でデータ鮮度の確認ができることについて定義や実行例を追って確認してみた。

default eye-catch image.

モデルについて考えてみた話

dbt Fundamentalsの「Models」のセクションについて、理解した内容を言葉にしてみた。 かなりの部分で感想を織り交ぜてみた。この記事は前回の記事(以下)の続き。 [clink url=\"https://ikuty.com/2023/07/10/dbt-word/\"] 実践的なコンテンツというよりは、概念の理解を優先した入門用のコンテンツであって、 これでモノを作れることはないと思うし、資格対策には不足していると思う。 動画の講師の方は自己紹介で教師のバックグラウンドがあると話されている。 個人的にはUdemyの類似品より構成と英語の発音がわかりやすいと感じる。 [arst_toc tag=\"h4\"] クエリをバラして理解しやすいようにしたい 関数型言語は、関数が再起的に評価されることで最終的な応答が確定する。 手続き型言語と異なり、再帰の評価の途中で一時停止して内容を観察することが難しい。 SQLは関数型言語ではないが、内包するSQLの評価を一時停止して観察することが難しい、 という点で近い。制御がない一筆書きである点が複雑さから逃れる理由になっていそう。 ビジネスロジックが乗った巨大なクエリは確かに一筆書きではあるが、理解しづらい。 クエリをバラしたら理解しやすいんじゃね? みたいな動機があるのだと思う。 ただバラして制御するとそれはもう手続きなので、バラすけど再利用するだけに留める。 バラして、途中経過を観察したり動きを制御したり、そういう仕組みをdbtが提供する。 Modularity クエリをバラすことを、dbtでは「Modularity(モジュール性)」という用語で説明している。 動画では「車の製造」が例えとして挙げられている。パーツとパーツを繋げて車を製造すると。 Modularityは構造化されていて、上流(Upstream)から下流(Downstream)へと繋がる。 Upstreamで作ったパーツをDownstreamで再利用する、という仕組み。 1個1個のパーツは、CTE(Common Table Expression)を利用して表現される。 CTEは標準SQLに定義されていて、クエリの再利用性を高める表現手段。 要は、dbtはCTEを使ってクエリの再利用性を強制するF/Wなんだろうと思う。 CTEと論理レイヤとモデル 実際に動作する生クエリを書くのではなく、論理的な記述レイヤを設けている。 論理レイヤの記述フォーマットとして、jinjaテンプレートが採用されている。 jinjaテンプレートにマクロを記述することで論理レイヤの表現能力を獲得している様子。 Modularityの観点でCTEで各パーツを定義し、Up->Downのフローを論理的に定義する。 dbtが論理レイヤをコンパイルして物理SQLを吐くという仕様になっている。 このパーツのことをdbtでは「モデル」と呼んでいる様子。 1個のSELECT文が1個のモデルと対応する制約が与えられる。 これにより、Up側モデルをDown側モデルで再利用することと、クエリ結果の再利用を紐付けている。 Materialization パーツは個別に具体的なSQLにコンパイルされる。これには「Materialize」という用語が付いている。 Upstream側を実体化する際、その実体化の手段を選択できる。 View モデルをViewとして実体化する。実データを作らないので作るのが速いしストレージを食わない。 Viewに乗ったロジックが複雑だと所要時間が増えるので、その場合はTableの採用を考える。 CREATE VIEW AS ... が使われる。 Table モデルをTableとして実体化する。実データを作るので作るのに時間がかかるしストレージを食う。 ロジックが複雑であっても所要時間が増えない。BIから直接参照するとかならTableの採用を考える。 Up側にあるデータに追加や変更があっても反映されない。 CREATE (TRANSIENT) TABLE AS ... が使われる。 Incremental パイプラインを構築する際、「置き換え」なのか「増分」なのかは重要な観点となる。 初回生成時は通常のTableとして、2回目移行は増分だけを生成する。なので速い。 「増分」とする場合、どこが既存と増分の境界なのかを定義する必要がある。 その定義をモデルファイルに書く必要があり、若干複雑になる。 Ephemeral 中間テーブルをいちいちViewやTableに実体化し続けると、DWが再利用する可能性が低い何かで 埋め尽くされてしまうことがある。そういったテーブルはModularityの中で論理的に再利用したいが、 物理的に存在して欲しくない。それを実現できる。 Downstream側で1個か2個使うだけで、直接クエリを実行する必要がない場合に使うと良い。 モデルの例 動画では以下のクエリをdbt流に書く例が示されている。 以下は生クエリで、with句により3個のクエリを用意し4個目のクエリで結合している。 with customers as ( select id as customer_id, first_name, last_name from raw.jaffle_shop.customers ), orders as ( select id as order_id, user_id as customer_id, order_date, status from raw.jaffle_shop.orders ), customer_orders as ( select customer_id, min(order_date) as first_order_date, max(order_date) as most_recent_order_date, count(order_id) as number_of_orders from orders group by 1 ), final as ( select customers.customer_id, customers.first_name, customers.last_name, customer_orders.first_order_date, customer_orders.most_recent_order_date, coalesce(customer_orders.number_of_orders, 0) as number_of_orders from customers left join customer_orders using (customer_id) ) select * from final さて、次はdbtの論理レイヤの話。まずcustomersとordersをを別のファイルに記述する。 with customers as ( select id as customer_id, first_name, last_name from raw.jaffle_shop.customers ) select * from customers with orders as ( select id as order_id, user_id as customer_id, order_date, status from raw.jaffle_shop.orders ) select * from orders customersとordersを結合する。その際、customersとordersをref関数により参照する。 言い換えると、以下が評価される時点で、既にcustomersとordersは実体が存在する。 そのため、以下を評価する前に、customersとordersの中にあるデータを確認できる。 with customers as ( select * from {{ ref(\'stg_customers\')}} ), orders as ( select * from {{ ref(\'stg_orders\') }} ), customer_orders as ( select customer_id, min(order_date) as first_order_date, max(order_date) as most_recent_order_date, count(order_id) as number_of_orders from orders group by 1 ), final as ( select customers.customer_id, customers.first_name, customers.last_name, customer_orders.first_order_date, customer_orders.most_recent_order_date, coalesce(customer_orders.number_of_orders, 0) as number_of_orders from customers left join customer_orders using (customer_id) ) select * from final dbt runコマンドにより、全てのモデルをコンパイルする。 また --selectをつけると、、特定のモデルだけコンパイルできる。 dbtには下図のようにモデル間の依存関係をグラフィカルに表示する機能がある。 モデリングの方法論とdbtの仮定 dbtは単なるクエリの変換ツールであって、現実世界のモデルする方法論を規定するものではない。 しかし、動画ではモデルのセクションでチラッとこの話に触れられている。 Fact TableとDimensional Tableに整理してDimension毎のFactを提供しましょう、 という流儀や、Kimball,Data Vaultのような流儀など、諸々存在する。 動画では、これらをTraditionalと分類し正規化がポイントであると言っている。 コンピューティングとストレージが安くなり、ELTが可能になったおかげで、 \"正規化しない(Denormalized)\"モデリング手法が実現可能になりましたよ、と言っている。 むむ.. 相当深そうだが入り口だけ。 いったん、ELTを前提としてモデルを組み上げてください、ぐらいの話として聞き取る。 ELTを前提として、モデルをこうレイヤリングし名前を付けますよと話されている。 Source ELTであるが故に、外部のデータを無変換でDWに格納したところからスタートする。 そのレイヤを「Source」と名付ける。 Staging Sourceレイヤのモデルのクレンジングと標準化を行うレイヤを「Staging」と名づける。 StagingモデルはSourceモデルと1:1対応させる。 Intermediate Stagingモデルに対してビジネスロジックを適用して最終的なテーブルを生成する。 ビジネスロジックはダラダラと汚く大きくなることが常で、このレイヤで吸収する。 Fact 起きていること、または、既に起きたこと。履歴があるもの。 なんたらイベント、Webサイトにおけるクリック事象、投票など。 Dimension 時間の経過で発生するものではない履歴がない事象。 人、場所、コト、ユーザ、会社、製品、、顧客など。 以下は、動画で話されている実行例。 dbt-labs/jaffle_shop https://github.com/dbt-labs/jaffle_shop 一番左の緑色のモデルがSource。外部から引っ張ってきたRawデータ。 Sourceと1対1対応する形で、「stg_*」という名前のStagingモデルが存在する。 Sourceに対してクレンジング、標準化を適用したデータ。 「fact_*」は、時間経過により発生する履歴データ、そして、「dim_*」は属性データ。 Down側からUp側へ、Source->Staging->Fact/Dimension という流れになっている。 よもやま話(動画と関係なし) 例えばLaravel/EloquentやRails/ActiveRecordにはQuery Builderが付いていて、 手続き型言語によってクエリのModularityを得るアプローチには既視感があると勝手に思っている。 もちろんクエリ結果をDWに統合できないから、DWに統合できるF/Wが必要なので、 そのアプローチはSnowflakeで言えばSnowparkによって扱われるのかなと思う。 手続き型言語ではソフトウエア開発の複雑さから逃れられないことになっている。 ちなみにORMにおける物理テーブルの抽象化において「モデル」という用語が充てられている。 少なくとも1990年代後半にはORMの文脈で物理テーブルを抽象化する概念は実用化されていた気がする。 2000年代には、オブジェクト指向分析設計の文脈で「現実世界のモデル化」がイキりたおされていた。 2010年代、Laravel/Eloquent, Rails/ActiveRecordでORMのモダニズムが言われてた。 手続きに持ち込まず、SQLの世界だけでモデル化が話されるのは新しい気がする。 ここは最初の「Analytics Engineers」の話に由来している気がする。 まとめ dbt Fundamentalsの「Models」セクションを聴いて、内容を文書化してみた。 dbtはModularityと考え方に基づいてCTEによりクエリをバラすツールであることについて、 Modelの定義や実行例を追って確認してみた。

default eye-catch image.

ETLとELTの違いが説明されていたので聞いてみた話

サーバーサイドエンジニアのデータベースエンジニア寄りの枠ぐらいの認識しかないと、 データエンジニアリングの尖った部分に永遠に近づけないという感触がある。 サーバーサイドエンジニアの入口から入った場合はちゃんとチェンジしないといけないな。 dbtが公式で「データエンジニアリングの用語と概念」を長い尺で説明していて学んでみた。 サーバーサイドエンジニア的にはETLは普通のジョブやバッチ処理だと思うが、 ELT化することで何を改善しようとしていて何が達成されたのか説明できるようになった。 dbt Fundamentals https://courses.getdbt.com/courses/fundamentals [arst_toc tag=\"h4\"] ETLとELTの違い 旧来から分析基盤を作るためにデータを抽出してどこかに蓄え加工してDBに入れてきた。 これは、巨大なデータをダイレクトに入れる箱や資源がなくそれ以外の選択肢がなかったため。 これはもうサーバーサイド開発であってバッチ処理で必要なデータを揃えているのにあたる。 構成を実現するためのインフラレベルの観点、分析用途に加工するビジネスロジックの観点の2つ。 この2つをゴチャっと処理する巨大なソフトウエアを書かないといけなかった。 クラウドベースのDWが登場し、巨大なデータを入れる箱や資源が安く手に入るようになった。 そしてdbtのように「生データをうまいこと加工する」ツールが手に入るようになった。 抽出したままの生データを突っ込むのは簡単だし、ツールでうまいこと加工できるようになった。 もちろんSQLだけで加工するので出来ること出来ないことはあるが、 うまくいけば、誰も開発やメンテがしづらい巨大なソフトウエアを書かなくてもよくなった。 分析用ビジネスロジックの分離 dbt公式のdbt Fundamentalsの初っ端で、 抽出したデータをDWに投入するインフラレベルの人たちを「Data Engineers」、 生データをBIで必要なレベルまで加工する人たちを「Analytics Engineers」、 分析する人たちを「Data Analysts」と分類している。 実際、エンジニアが分析対象を理解しようとすると、キャリアの壁にぶち当たる。 エンジニアは技術力が必要だし分析者は要件定義能力が必要。 要件定義的なことができるエンジニアは圧倒的に不足しているのが世の常で、 エンジニアに対する要件から要件定義能力を分離したいというのは切実な思い。 dbt Fundamentalsで紹介されている下の表はよく出来ていると思う。 (フロントにETLを前提としたBIツールがあってTとLをUIでやらせる世界観だと 組み合わせたときにELTLになってしまう。トータルで考えないと上手くいかないと思う) Modern Data Stackとdbt dbtは「T」をやるツール。dbtの有名な図は以下。「E」と「L」は外。BIやMLは外。 ほぼSQLとymlだけで構成できるところが特徴。 SQLを使ってモデルを開発して、SQLを使ってテストして、ドキュメンテーションを構造化する。 WHがあって初めて存在するツールであって、WHの資源を使って動く。 オーケストレーションとDAGの管理が内包される。 まとめ インフラ的な観点とビジネスロジック的な観点の2つがゴチャった何かを作るのは大変という世界がある。 まぁそれでも書けよ、とも思うけど単にデータを抜いて格納するだけの人と、 要件定義してビジネスロジックを考えてデータを加工する人を分けられれば、人を集めやすい。 強力なDWが出来たことでデカいデータを生でぶち込めるようになったし、dbtみたいなツールができて ビジネスロジックに基づいて生データを加工するのが簡単になった。 トータルで複雑なソフトウエアを作らなくてもよくなるからELT美味しいよ、という話でした。

default eye-catch image.

Azure環境において最小権限でストレージ統合を構成する

Azure側を最小権限で構成する方法が書かれた記事がありそうで全然見つからない。 そこで、本記事ではAzure側を最小権限で構成する方法を調べてまとめてみた。 2023年7月現在、公式はSASを使う方法ではなくストレージ統合を使う方法を推奨している。 ストレージ統合によりAzureADへ認証を委譲することで認証情報を持ち回る必要がなくなる。 本記事ではストレージ統合を使用する。 ストレージアカウントにはURL(認証)・N/Wの2系統のパブリックアクセスの経路が存在するが、 URL(認証)によるパブリックアクセスは遮断し、Snowflake VNetからのアクセスのみ許可する。 Azure Portal/CLIからのアクセスを固定IPで制限する。 [arst_toc tag=\"h4\"] ストレージアカウント ストレージアカウントは、Blob(Object)、Files(SMB)、Queue、Table(NoSQL)の4種類が存在する。 Snowflakeの外部ステージとして使用するのはAWS S3でありAzure Blobである。 Blobは最上位階層にない。ストレージアカウント内にはコンテナがあり、コンテナ内にBlobがある。 Blobストレージ内のオブジェクトに対してのみ、外部からアクセス(パブリックアクセス)できる。 パブリックアクセスには、URL・ネットワークの2経路が存在し、それぞれ独立して存在する。 他方を制限したからといってもう一方が自動的に制限されたりはしない。 パブリックURLアクセス ストレージアカウントレベルで、パブリックアクセスを以下から選択できる。 Enabled Disabled また、コンテナレベルで、パブリックアクセスを以下から選択できる。 Private (no anonymous access) Blob (anonymous read access for blobs only) Container (anonymous read access for containers and blobs) 上位階層であるストレージアカウントレベルの設定が優先される。上位で不許可にすれば不許可。 もしストレージアカウントレベルで許可した場合、コンテナレベルの設定が効く。 その組み合わせは以下の通り。確かにMECEなのだが絶望的に冗長に思える。 コンテナ自体はURLアクセス不許可だがBlobはURLアクセス可とか設定できてしまう。 パブリックネットワークアクセス パブリックネットワークアクセスとして以下から選択できる。 Enabled from all network Enabled from selected virtual networks and IP addresses Disabled 分かりづらいが、ストレージアカウント/Blobへのアクセスに使われるN/Wインターフェースを選ぶ。 Disabledにすると、仮想ネットワークのプライベートIPアドレスを使用するN/Wインターフェースとなる。 Enabledには2つあり、全てのN/Wからアクセス可能なN/Wインターフェースか、 選択した仮想N/WまたはIPアドレスからアクセス可能なN/Wインターフェースを選ぶ もちろんDisabledにした場合は、仮想プライベートエンドポイントが必要となる。 ストレージ統合の際のストレージアカウントの最小権限構成 以下のようにして最小権限構成を行なった。 パブリックURLアクセスをストレージアカウントレベルでDisabled パブリックネットワークアクセスを固定IP、及びSnowflake VNet IDsに設定 Snowflakeのストレージ統合機能によりサービスプリンシパルを作成 ストレージコンテナのIAMでサービスプリンシパルにBlob読み書きロール設定 ストレージアカウントに対するSnowflakeのVNetサブネットIDsからのアクセス許可 ストレージアカウントへのパブリックアクセスを「選択したVNetまたはIPアドレスのみ」とした場合、 SnowflakeのVNetまたはIPアドレスが分かる必要がある。IPアドレスは不定なのでVNet IDsを指定する。 公式にドンピシャの説明があるので、それを参考にVNet IDsを設定する。 VNet サブネット IDs の許可 Snowflake側で以下のSQLを実行すると、属するVNet IDがJSONで複数出力される。 それを記録しておく。 USE ROLE ACCOUNTADMIN; SELECT SYSTEM$GET_SNOWFLAKE_PLATFORM_INFO() ; { \"snowflake-vnet-subnet-id\":[ \"/subscriptions/hogehoge\", \"/subscriptions/fugafuga\" ] } 続いて、ストレージアカウントに上の2個のVNet IDからのアクセスを許可する。 以下のように、Azure CLIでログインしてVNet IDの個数だけコマンドを実行する。 $ az storage account network-rule add --account-name --resource-group --subnet \"/subscriptions/hogehoge\" ... $ az storage account network-rule add --account-name --resource-group --subnet \"/subscriptions/fugafuga\" ... ストレージ統合を作成する AWS環境と同様に、CREATE STORAGE INTEGRATION によりストレージ統合を作成する。 Azure CLIやAzure PortalでテナントIDを取得しておいてAZURE_TENANT_IDに渡す。 STORAGE_ALLOWED_LOCATIONSにはBLOBの位置を表す文字列を渡す。 書式は azure://ストレージアカウント名.blob.core.windows.net/コンテナ名/パス 。 BLOBの位置は複数渡せる。 CREATE STORAGE INTEGRATION storage_integration_azure TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = \'AZURE\' ENABLED = TRUE AZURE_TENANT_ID = \'\' STORAGE_ALLOWED_LOCATIONS = (\'azure://storageikuty1.blob.core.windows.net/container1/files/\') ; 続いて、同意URL(AZURE_CONSENT_URL)を取得する。 DESC STORAGE INTEGRATION storage_integration_azure ; property property_type property_value property_default ENABLED Boolean true false STORAGE_PROVIDER String AZURE STORAGE_ALLOWED_LOCATIONS List azure://storageikuty1.blob.core.windows.net/container1/files/ [] STORAGE_BLOCKED_LOCATIONS List [] AZURE_TENANT_ID String AZURE_CONSENT_URL String https://login.microsoftonline.com/hogehoge&response_type=code AZURE_MULTI_TENANT_APP_NAME String COMMENT String URLを開き、表示された画面で「承認」ボタンを押下する。 この際、権限がなければ権限不足である旨表示されてここで終了する。(権限は後述) このタイミングでAzure環境にSnowflakeと対応するサービスプリンシパルが作られる。 上手くいくとSnowflakeの企業サイトに遷移する。 Azure Portalにログインし、Snowflakeと共有したいストレージアカウントを選択する。 IAMから、サービスプリンシパルに対して以下のロールを割り当てる。 サービスプリンシパルIDは、上で得られたマルチテナントアプリ名のハイフンの前の部分。 この辺り、むちゃくちゃ煩雑で、AWSよりも大変。 Storage Blob Data Reader(ストレージBlobデータ閲覧者) Storage Blob Data Contributor(ストレージBlobデータ共同作成者) 次にDBにフォーマットと(外部)ステージを作成する。 USE ROLE SYSADMIN; USE DATABASE HOGE; CREATE OR REPLACE FILE FORMAT MY_CSV_FORMAT TYPE = CSV COMMENT = \'接続検証\' ; CREATE OR REPLACE STAGE my_azure_stage STORAGE_INTEGRATION = storage_integration_azure URL = \'azure://storageikuty1.blob.core.windows.net/container1/files/\' FILE_FORMAT = my_csv_format; ; 該当のBlobにファイルをアップロードする。(パスを指定した場合忘れがち) そして疎通確認。 $ show storage_integration_azure ... $ list @storage_integration_azure ... まとめ Azure環境でストレージ統合によりBlobアクセスを設定してみた。 その際、Azure環境側でパブリックアクセスの開き方が最小限になるように構成した。 Azure側、Snowflake側共に大きな権限が必要だが、公式推奨だし可能ならこの方法が良さそう。

default eye-catch image.

Azure Data Factoryに入門する

Azureクラウド。データと特にAIの世界で頭ひとつ抜け出しそうだ。 有限な時間を有効に使うには、\"ベストソリューション\"に全振りすることが適切だと信じているが、 一方で、それだと認知の奥行きのようなものが少ない気がしている。 考え方を広げるには色々知るべきだとも思う。 知識を糧にするために必要なことは要約と文書化だと信じている。 データ・パイプラインを構築する数多の技術の1つとして、Azure Data Factoryがある。 今回、Azure Data Factoryに入門してみようと思う。 Azureクラウドは公式の説明に癖がある。 分かるまでとっつきづらい印象がある。知識を糧にするため要約と文書化を続けてみたい。 [arst_toc tag=\"h4\"] パイプライン、アクティビティ、データセット、リンクされたサービス データが物理的に格納されている場所をデータセットとリンクする。「リンクサービス」などと言う。 対応するサービスは後述する。 アクティビティには入力データセットと出力データセットを設定できる。 アクティビティは入力データセットのデータを消費し、出力データセットにデータを生成する。 アクティビティは大きく「データのコピー」と「データの変換」と「制御」の責務を持たせられる。 複数のアクティビティを束ねてパイプラインを構成する。これらの関係は下図の通り。 このうち「制御」アクティビティは、変数の追加、別のパイプラインの実行、Assert、ForEach、 If Condition、ルックアップ、変数の設定、Until、検証、Wait、Web、Webhookなどができる。 ユーザは、UI上でポチポチしてパイプラインを組み上げられる。 下図のように、アクティビティの出力を別のアクティビティの入力とすることで機能を作り込んでいく。 前のアクティビティが正常終了しなかった場合、次のアクティビティは実行されない。 もちろん並列実行させることもできる。 サポートするデータストア・コネクタ 移動アクティビティは、ソースからシンクへデータをコピーする。 サポートされているデータストアは公式に記述があるが、一部記述に矛盾があり信用できない。 Azure Data Factory と Azure Synapse Analytics のコネクタの概要 さすがに仕様上は凄まじいサポート具合だと思う。[汎用]により事実上無限に繋げられる。 [Azure] Blob, Cognitive Search Index, CosmosDB, Data Explore, ADSL Gen1/Gen2, Azure Database (MariaDB/PostgreSQL/MySQL), Databricks Delta Lake, Files, SQL Database, SQL Managed Instance, Synapse Analytics, Table Storage [Database] AWS RDS(Oracle, SQL Server), AWS Redshift, DB2, Drill, GCP BigQuery, GreenPlum, HBase, Apache Impala, Informix, MariaDB, Microsoft Access, MySQL, Netezza, Phenix, PostgreSQL, SAP Business Warehouse, SAP HANA, Snowflake, Spark, SQL Server, Sybase, Terradata, Vertica [NoSQL] Cassandra, MongoDB [Files] S3, FileSystem, FTP, GCP Cloud Storage, HDFs, Oracle Storage, SFTP [サービスとアプリ] Amazon Marketplace WebService, Appfingures, Asana, Concur,data world, Dataverse, Dynamics365, Dynamics AX, Dynamics CRM, GitHub, Google AdWords, Google Spredsheet, HubSpot, Jira, Magento, Microsoft365, Oracle Eloqua, Oracle Responsys, Oracle Service Cloud, Paypal, Quickbase,Quick Books,Salesforce, Salesforce Service Cloud, Sales Marketing Cloud, C4C, SAP ECC, Service Now, SharePoint Online, Shopify, SmartSheet, Square, TeamDesk, Twillo, Web(HTML), Xero, Zen Desk, Zoho [汎用] HTTP, OData, ODBC, RESTfulAPI ファイル形式は以下。 Arvo,バイナリ,Common Data Model形式,区切りテキスト, 差分形式, Excel, JSON, ORC, Parquet, XML スケジュール設定と実行 「スケジュール」と名前が付くものがパイプライン、アクティビティ、データセットに存在するが、 パイプラインの実行時刻・タイミングを定義するのは「出力データセット」であるようだ。 アクティビティの実行タイミングは、入力ではなく出力のデータセットのスケジュールで決まる。 このため、入力データセットは省略できるが、出力データセットは必ず1個作る必要がある。 アクティビティと入出力データセットの関係が下図に示されている。 出力データセットが、「8-9AM」、「9-10AM」、「10-11AM」の3回の枠でスケジュールされている。 それに合わせてアクティビティと入力データセットのスケジュールが決まる。 下図はさらに、入力が準備され、アクティビティと出力がこれから実行される様を表している。 アクティビティにスケジュールをオプションで設定できるが、 その場合は出力データセットのものと合わせる必要がある。(何の意味があるのか...) パイプラインにパイプラインがアクティブな期間を設定できる。 非アクティブな時間に出力データセットが発火しても無視される。 統合ランタイム(IR) Azure内のフルマネージドなサーバーレスコンピューティング。 仮想化されたコンピューティングリソースをData Factoryでは統合ランタイムと呼ぶ。 悪いセンスの極みみたいな名前だが、Windowsで開発した経験があるとピンと来ると思う。 アクティビティや入出力データセットで定義された内容を処理する際に消費される計算資源。 各処理がどこで実行されるのか、というのはパイプラインの実行で気になるポイント。 可能な限りデータの移動やコピーは省略して欲しいし、各サービスが得意な機能を使って欲しい。 この辺りを最大限考慮して必要な場所で処理するよ、と公式には書かれている。 3種類の統合ランタイムが存在する。 Azure統合ランタイム Azureでデータフローを実行する。クラウドストア間でコピーアクティビティを実行する。 コピーは負荷が高い操作であるので、オートスケールが考慮されている。 他に、アクティビティの負荷をコンピューティングに割り当てる(ディスパッチ)機能を有する。 Self-Hosted統合ランタイム オンプレミスかAzure上のプライベートネットワークにインストールするコンピューティングリソース。 Windowsマシンを自前で用意(Self-Host)して、Azureサービスに提供するというもの。 まさにMicrosoftのWindowsをAzureと繋ぐコンピューティング仮想化の姿とも言える。 オンプレミスに大量のコンピュータを用意してAzureに提供したりすることができる。 Azure-SSIS統合ランタイム SSIS(SQL Server Integration Service)パッケージを実行する専用のAzure上の フルマネージドクラスタ。 SSISプロジェクト用に独自のAzureSQL Database、SQL Managed Instanceを持ち込める。 ノードサイズのスケールアップ、クラスターのスケールアウトを実行できる。 SSIS統合ランタイムを手動で停止することでコストを節約することもできる。 SSDTやSSMS等、SQLServer用クライアントツールをSSIS統合ランタイムに繋ぐことができる。 統合ランタイムの場所 仮想マシンがAzureクラウド上のどこに物理的に配置されるのかは気になるところ。 一般に性能観点だけであればデータに近いところに仮想マシンを配置しておくと良い結果となる。 が、データコンプライアンスの観点で、仮想マシンを配置する場所を固定しないといけない場合がある。 Azure IR Azure IRについて、デフォルトでは自動的に配置場所が決まる。 つまり、シンクデータストアと\"同じリージョン\"または\"同じ地理的な場所\"に配置される。 クラウドサービスを使っていると、そのサービスのリージョン、地理的な場所が不定となる場合がある。 シンクデータストアの場所がわからず、世界中の好き勝手なところにAzure IRが作られてしまう。 そんな時は、自動的なコンフィグレーションを無視して配置場所を固定することができる。 また、データが特定のリージョンから離れないように、明示的にVMの場所を固定できる。 Self-hosted IR Self-hostするものなので、そもそも場所は自分で決めるもの。 Azure-SSIS IR 箱としてのデータベースと、そのデータベースを駆動するコンピューティングリソースが分離している。 普通に考えて、箱の場所とVMの場所は同じにしないとパフォーマンスが悪いだろう。(公式に記載あり) ただし、Data FactoryとSSIS IRの場所は必ずしも同じでなくても良いようだ。 データ統合単位(DIU) なんか説明なくいきなり出てくるワード。Data Integration Unitの略だろうか。 Azure Data Factoryにおいて、1つの単位の能力を表す尺度。 CPU、メモリ、ネットワークリソース割り当てを組み合わせたもの。 DIUはAzure IRランタイムにのみ適用される。Self-hostedランタイムには適用されない。 要はAzure IRランタイムの戦闘力を数値化したもの。課金に影響する。 データ統合単位 シナリオにあった統合ランタイムの選択 Azure統合ランタイムからパブリックにアクセスできないデータストアにアクセスする場合、 データストアのファイアウォールなどにAzure統合ランタイムのパブリックIPを設定して抜け穴を作る。 しかし、Azureを含むクラウドアーキテクチャの設計においてあまり望ましくない。 よりベターなのは、PrivateLinkや、Load Balancerの追加セットアップを行う。 また、オンプレミスにある場合、Express Route、S2S VPN経由でアクセスを行う。 この追加セットアップは必要以上に面倒を増やすし、最初からSelf-Hostedにした方が良い。 Enterpriseの現場において、データ転送の経路に対するセキュリティ要件は厳しい。 転送経路が全てプライベートになっている状態を保証できることが理想となる。 つまり、プライベートエンドポイント間のPrivateLinkを設定することが理想である。 Azure統合ランタイムは、デフォでプライベートエンドポイントとPrivateLinkはサポートされない。 マネージド仮想ネットワークを使用すると、データストアに対してプライベートエンドポイントを設定し、 統合ランタイムをPrivateLinkを設定できる。 また、Self-hosted統合ランタイムにおいて、仮想ネットワークにプライベートエンドポイントと PrivateLinkを設定できる。 クラウドインフラストラクチャの責任共有モデルについての議論もある。 Azure統合ランタイムのパッチ、バージョン更新等のメンテナンスはAzure側が自動的に行う。 対して、Self-hosted統合ランタイムは、ユーザが責任を持つ必要がある。 より楽をしたいならAzure統合ランタイムがより良い選択肢となる。 まとめ ドキュメントを調べてAzure Data Factoryの用語をまとめてみた。 Azureクラウドの公式ドキュメントは癖があり、ドキュメントを読むだけで一苦労だなという印象。 やはり、AWSと比較してEnterpriseを全面的に推している印象がある。 後続の記事でガンガン知識を文書化していく。