default eye-catch image.

Streamlit appをrestricted caller’s rightsで動作させる場合にコンテナインスタンスが必須となる背景を考えた話

行レベルセキュリティに頼ったマルチテナントは、実装ミスによる認可破綻が起こりやすいので怖い。 RBAC・インフラレベルで認可する仕組みができれば、appは認可コードを一切かかずに、 Snowflakeに認可の安全性を移譲できる。 しかし、これまでStreamlit in SnowflakeではOwner\'s rightsでしか動作せず、 実現ができなかった。6月1日に「Restricted caller\'s rights」が一般提供(GA)され、 caller\'s rightsで Streamlit を動作させられるようになった。 ただし、コンテナインスタンスが必須となる。 どういう仕組みで機能するのか気になったので調べてみた。 [arst_toc tag=\"h4\"] Restricted caller\'s rightsが一般提供(GA)された これまで、ストアドプロシージャ、SPCSサービス、Streamlit in SnowflakeアプリはOwner\'s role、 すなわち、リソースの所有者の権限でしか動作させることができなかった。 2026年6月1日に「Restricted caller’s rights」がGAされたことで、 これらのリソースをCaller\'s role、すなわち呼び出し元権限で動作させることが可能となった。 呼び出し元の権限次第で、Snowflake側のガバナンスが全く効かない、という世界線は存在せず、 「呼び出し元の権限」に対して「別のロールによる許可」で制限した権限でのみ、これらを実行できる。 混乱する代理人問題とCaller\'s right Caller\'s rightで動作する、ということは、所有者が書いたコードを、閲覧者の権限で実行するということ。 閲覧者の強い権限により「閲覧するだけのつもりだったがDROPできてしまった」みたいなことになる。 これを混乱する代理人問題と言い、権限を持つ閲覧者が所有者のコードに意図せず権限を貸している。 アプリが悪意を持っていたりアホだったりした場合に被害が拡大する要因となる。 GRANT CallerとCaller\'s rightの権限波及の仕組み 実行主体が所有者(Owner)から呼び出し元(Caller)に移るためSnowflake側の権限波及が大きく変わる。 管理者が別の管理者に MANAGE CALLER GRANTS することで、別の管理者は CALLER GRANTS できる。 別の管理者は CALLER USAGE, CALLER SELECT 等で「このアプリ(Owner\'s role)が閲覧者の代理として 使って良い権限」をホワイトリスト形式で指定する。 結果、閲覧者がたまたま ACCOUNTADMIN でも 管理者はアプリが許可した対象へのSELECTしか 行使できない。つまり、閲覧者がDROPやALTERを持っていたとしてもアプリ経由では実行できない。 閲覧者の身元・ポリシー(行/列制御)は活かしつつアプリが行使できる権限の上限は管理者が固定する、 が実現できるようになる。 なぜウェアハウスランタイムではCaller\'s right動作ができないのか Restricted caller’s rights and Streamlit in Snowflake 公式によると、ウェアハウスランタイムではCaller\'s right動作できない。 By default, all Streamlit in Snowflake apps run with the privileges of the owner, not the privileges of the caller. The Streamlit app developer can define whether a container-runtime app runs with owner’s rights or restricted caller’s rights. Restricted caller’s rights aren’t supported in warehouse runtimes. Restricted caller’s rights requires Streamlit version 1.53.1 or later. 以下が核心。Streamlit in SnowflakeアプリはSQLをOwner権限のストアドプロシージャ相当のサンドボックスで動く。だから、DESCRIBE/SHOW/LIST 制限などの「ストアドプロシージャ相当の制約」が付く。 Runtime environments for Streamlit apps Warehouse runtimes provide an on-demand, personal instance of the Streamlit app for each viewer. When a viewer opens the app, a new instance of the app is created for that viewer. Each viewer has their own isolated environment, which increases user load times. While both runtimes execute SQL queries using the owner’s privileges, apps using warehouse runtimes are subject to similar restrictions as owner’s rights stored procedures. For more information, see Owner’s rights stored procedures. ストアドプロシージャ準拠の権限モデルにはCaller\'s rightが想定されていなかったため、 同じ理由でStreamlit in SnowflakeでもCaller\'s rightができなかった。 Caller\'s right の本質は、所有者とは別の閲覧者としてのセッションを張る、ということ。 これは、アプリプロセスが複数の認証コンテキスト(OwnerとCaller)を同時に保持・維持する必要がある。 ステートレスなアプリのホスティングでセッション変数が揮発する問題の解決に、 ウェアハウスにステートを持たせるという解決策をとらないのは、それは当たり前だし、 そういう用途なら、常時稼働でステートを持てるコンテナウェアハウスが適当だよ、ということかな。 ストアドプロシージャにおけるOwner\'s right、Caller\'s rightについては以下があたる。 [clink implicit=\"false\" url=\"https://docs.snowflake.com/en/developer-guide/stored-procedure/stored-procedures-rights\" imgurl=\"https://www.snowflake.com/content/snowflake-site/global/ja/lp/snowflake-ai-data-predictions/_jcr_content/root/responsivegrid/container_882109576/container/flexible_column_cont/flexible_column_content_container_1/container/image.coreimg.svg/1768274028304/snowflake-logo-blue.svg\" title=\"Understanding caller’s rights and owner’s rights stored procedures\" excerpt=\"A stored procedure runs with either the caller’s rights or the owner’s rights. It cannot run with both at the same time. This topic describes the differences between a caller’s rights stored procedure and an owner’s rights stored procedure.\"] まとめ Streamlit in Snowflakeのクエリ実行は背後でストアドプロシージャが動作していることを理解した。 ストアドプロシージャはセキュリティのため、制限されたサンドボックスで動作している。 混乱する代理人問題の対応として、SnowflakeではCaller\'s rightsによる動作を許可していない。 このモデルでは、Caller\'s rights の入り込む余地がないか、既存の破壊的変更が必要となる。 そこで、常時稼働、ステートフルを前提としたコンテナインスタンスに Caller\'s rights を ホワイトリスト式で制限する「Restricted caller\'s rights」が導入された。

default eye-catch image.

Snowflake External OAuthについての公式ドキュメントを読んでみた話

はじめに Enterpriseにおいて「お前は誰か?」を確認する手段は非常に多岐にわたる。 セキュリティと絡んで手段は拡大傾向にあり、新しい認証手段への追従が求められるケースは多い。 自前で認証情報を保有、管理し、セキュリティの保証を担保した手順を用意するのは不可能に近い。 現実的には認証情報の保有と管理、および認証手段を専用のプラットフォームに移譲させたい。 実際、認証の泥臭いプロセスはIdP(Identity Provider)が面倒を見てくれる。 SnowflakeはIdPと薄く関係して、IdPによる認証結果を使い回すことができる。 SnowflakeはIdPがどういったプロセスで認証したのかは一切関与しない。 認証後、「お前にこの権限を与えて良いか?」を実装しなければならない場合、 アプリ側に機能サポートがなければ、コードでそれを保証しなければならない。 Snowflakeは、ここをExternal OAuth統合として汎化しフルにサポートしている。 具体的には、SnowflakeはExternal OAuth統合として汎化していて、 OAuth2.0認可サーバと統合し、RBACとの紐付けまでを面倒みてくれる。 RBACの最小範囲であるスキーマより細かい粒度を区別する場合でなければ、 RBACだけで区別が完了することとなり、大幅な工数削減と品質安定化を達成できる。 昔Fitbit APIのOAuth2.0フローを実装した時から始まり、 過去に何件かWebアプリ開発で認証認可まわりの実装をしたと思う。 Webアプリの認証認可F/Wはかなり枯れていて、正直中身を知らなくても書けてしまう。 開発者人口が少ないSaaSサービスであるSnowflakeがブラックボックス化した 認証認可の仕組みを読み解くのは、Webアプリのそれとは次元の違う大変さがある。 (こと認証認可の文脈では安全性の保証がセットとなるため) Snowflake External OAuthについて厳密に調べる機会があったので、 生成AIを使わず100%自分の思考と言葉で記事を起こしていく。 [arst_toc tag=\"h4\"] 認証(AuthN) 認証、つまり、Authenticationは、「お前は誰か」を確認すること。 IdPにID/PWを登録しておきID/PWを入力したりMFAを通ることで「確かに〇〇さんだ」と確認すること。 単一要素認証(SFA)、多要素認証(MFA)、パスキー認証、FIDO2認証、他、多様な認証方式がある。 またシングルサインオン(SSO)、により組織を跨ぐ連携を行うことができる。 サービス間のSSO方式としてSAML2.0、API等のSSO方式としてOIDC2.0が広く使われている。 顧客管理のIdPによる認証を本IdPに引き継ぐIDフェデレーションにより組織間認証連携を実現できる。 認可(AuthZ) 一方認可、つまり、Authorizationは、「お前にこの権限を与えて良いか」を確認すること。 認可とは「誰がどのデータにどんなルールでアクセスして良いか」をコントロールする設計パターン。 「ルール作りの設計思想」と「システム間で権限をやり取りする技術規格」がごっちゃに扱われがち だが、レイヤが異なる2つの話を分けておくと少しわかりやすくなる。 「ルール作りの設計思想」 例えば以下のようにルールを定める。 ロールベースアクセス制御/Role Based Access Control ユーザ個人ではなく役割に対して権限を付与しユーザをそのロールに所属させる方式。管理者権限のユーザには作成・削除を与え、一般権限のユーザには閲覧のみを与えるなど、一般的な認可方式。SnowflakeのロールモデルはまさにRBACに基づく。 属性ベースアクセス制御/Attribute Based Access Control ロールだけでなくユーザの所属、勤務地、アクセスする時間帯、デバイスの種類など、複数の属性(コンテキスト)を組み合わせて動的に認可を判断する方式。 「システム間で権限をやり取りする技術規格」 例えば以下のようにルールを実現する技術規格を表す。 OAuth2.0 現在のWebで最も普及している「トークンベース」の認可フレームワーク。認可サーバーが発行した「アクセストークン(時限式のカードキー)」をアプリが提示し、リソースサーバー(Snowflakeなど)がそれを検証してアクセスを許可する。「権限の証明書」としてJWT(JSON Web Token)が実際にやり取りされる。JWTは、SON形式のデータを暗号論的に署名したもので、中身に「ユーザー名」「有効期限」、「付与されたロール(権限スコープ)」などが書き込まれている。 ケルベロス認証・認可 (Kerberos) 主に一昔前からの 社内ネットワーク(Active Directory)環境などで広く使われている方式。チケット」と呼ばれる暗号化されたデータをやり取りすることで一度のログインで社内のファイルサーバーやプリンタなどの利用権限(認可)をシームレスに得る。 あああ External OAuth External OAuthは顧客のOAuth2.0認可サーバを統合してシームレスなSSOを実現する。 認証プロセスはサービス側が気にするものではなく、本機能は認可の統合であることに注意すること。 なお公式(外部 OAuth の概要)は間違いなく認証・認可と言う言葉をごっちゃにしている。 OAuth2.0はRFC6749でThe OAuth2.0 Authorization Frameworkと定義されている。 受け渡しされるトークンはOIDCのような認証トークンではなく、OAuth2.0の認可トークンである。 外部OAuthという(認可の)仕組みをSnowflakeに設定しておくことで、 「外部のIdPが認証したという証明書」をSnowflakeが安全に受け取ってデータアクセス認可する仕組みだ。 公式(外部 OAuth の概要)によると、以下に公式に対応している。 公式にない場合は、外部 OAuth 用のカスタム認証サーバーを構成するで構成できる。 なお「公式」でないからといって「非対応」ではない。「公式」になくても汎用OAuth2.0用のカスタム認証サーバーとして構成できる。 Okta - 外部OAuth用Oktaの構成 Auth0はOktaファミリーだが↑では構成できない。カスタム認証サーバーとして構成が必要 Microsoft EntraID - 外部 OAuth 用 Microsoft Entra ID の構成 Ping Identity PingFederate - 外部 OAuth 用 Microsoft Entra ID の構成 Microsoft PowerBI - Power BI SSO からSnowflakeへ 公式にはExternal OAuthのメリットとして以下が挙げられている。 トークンの発行を認証サーバーに委任し、発行されたトークンの管理に集中できるようになる。 ログイン時のセキュリティルール(MFAやIP制限、承認フローなど)を、Prj IdP側に統合できる。 ユーザがその認証と許可に関する厳しいルール(テスト)をクリアしない限り、IdPはトークンを発行しない。 怪しいユーザはSnowflakeの入り口にすら辿り着けず、データは完璧に守られる。 認証をIdPに持たせることでSnowflake側から認証情報を除去できるためセキュアになる。 一見して認証のことしか書かれていないようだが、implicitに認可が書かれている。 Snowflakeは認可をIdPに完全に移譲し、認証とセットで認可が行われたトークンを確認するだけ、 ということは、Snowflake側に認可コードを一切書くことなしに認可を実現することと同義。 External OAuthの認証部分の基本フロー 公式に基本フローの図が貼ってある。ステップ1だけ構成時にのみ行う。他は都度実行される。 最初にセキュリティ統合の構成と、アプリ内の実装が開発者側の責務となる。 ベスプラに従ってルールから逸脱しないように構成することで、後はSaaSサービス間の自動連携となる。 外部OAuth認証サーバとSnowflakeのセキュリティ統合を構成し信頼性を確立する ユーザはアプリを介してSnowflakeにアクセスしようとする。アプリはユーザを確認しようとする 認証サーバはOAuthトークンをアプリに返す SnowflakeドライバはOAuthトークンを使用して接続文字列をSnowflakeに渡す SnowflakeはOAuthトークンを検証する Snowflakeはユーザ検索を実行する Snowflakeはユーザのロールに基づいてセッションをインスタンス化する External OAuthの認可部分、スコープ いきなり「スコープ」というワードが出てくるが、これ、JWTの\"scope\"キー/バリューのこと。 OAuth2.0においてJWTで認可範囲を設定するのだ、という理解と記憶がなければ読めない。 JWTは以下のような構成となっておりscopeを格納する場所がある。 認可サーバ側で何らかの許可処理の結果、ユーザのスコープが決まり、Snowflakeに送られる。 このトークンがSnowflakeに届くと、Snowflakeはscopeキーのバリューを読み取り、 「このユーザにはST_USER_ROLEというロール(権限)を適用してセッションを始めるべき」と判断する。 { \"iss\": \"https://your-project-idp.auth0.com/\", \"sub\": \"user_12345\", \"email\": \"user@client.com\", \"exp\": 1719100000, \"scope\": \"session:role:ST_USER_ROLE\" <-- 🌟これが「スコープ」 } Okta, PingFederate, カスタムの場合は以下のパターンを使用しなければならない。 スコープ 説明 session:role:<custom_role> Snowflakeのカスタムロールにマップする。例えばsession:role:ST_USER_ROLEで、ST_USER_ROLEにマップ session:role:public Snowflakeの PUBLIC ロールにマップ session:role-any 外部OAuthサーバでのSnowflakeロール管理を行わない場合これを渡す。特定のロールを固定せず、そのユーザに付与されているロールであれば、ログイン後に自由に切り替えて(USE ROLEして)使って良い、という少し緩めの認可 なお、以下のビルトインロールはデフォルトではブロックされる。 ACCOUNTADMIN GLOBALORGADMIN ORGADMIN SECURITYADMIN Snowflake OAuthは、セッション中のロールのセカンダリロールへの切り替えをサポートしていないが、 External OAuthでのセカンダリロールの使用はサポートしている。 External OAuth特有のセキュリティの抜け穴と対策 Snowflakeにおいて、アカウントレベルでネットワークポリシーによりIP制限をかけていたとしても、 External OAuthと合わせて構成するSecurity Integrationを経由してログインしてくる場合、 そのユーザ個人のIP制限が無視されてしまう、という仕様がある。 つまり、IdP側のIP制限が破られたり、トークンが盗まれたりした場合、 攻撃者はどこからでもSnowflakeのデータにアクセスできてしまう状態になる。 Snowflakeは、External OAuth自体にもネットワークポリシーを直接紐づけることを推奨している。 具体的にはSecurity Integrationにネットワークポリシーを直接紐づける。 これによりIdPから届いたトークンであっても、ネットワークポリシーで許可されたIPアドレス以外からの リクエストであれば、Snowflakeはセッションを開始しない。 これはIdPフェデレーション等で複雑化したユーザ組織の通信経路を全て把握する必要性を言っている。 こういうの、デフォルトで安全側に振って欲しいなとは思う。 カスタム認証サーバーの構成・トークンペイロード要件 カスタム認証サーバーがSnowflakeに送信するアクセストークンには、下表が含まれている必要がある。 クレーム 説明 scp Snowflake のカスタムロールを指定する文字列が含まれていること。値として session:role:ST_USER_ROLE のような Snowflake 指定の形式の文字列を、配列またはスペース区切りの文字列で必ず埋め込まなければならない。 scope 同上。IdPプロダクトによりscpかscopeのどちらかを入れる。 aud Snowflake アカウントの完全な URL(https://.snowflakecomputing.com)が含まれている必要がある。 exp 有効時間。トークンの有効期限が UNIX タイムスタンプ(エポック秒)で刻まれている必要がある。Snowflake はトークンを受け取った瞬間の時刻とこの exp を比較します。有効期限が過去の時刻になっている(期限切れ)場合は、その時点で認可を即座に拒否する。 iss 発行者。アクセストークンを発行したプリンシパルを文字列 URI として識別。つまりトークンを発行した IdPのアイデンティティ(例: https://your-project-idp.auth0.com/)。最後のスラッシュ(/)の有無まで1文字違わず一致させる必要がある。Snowflake 側の EXTERNAL_OAUTH_ISSUER で指定した文字列と完全に一致する必要がある。 iat 発行時刻。必須。JWT が発行された時刻を識別 カスタム認証サーバーの構成・セキュリティ統合の作成 External OAuth を実現する Snowflakeのリソースの実体。 カスタム認証サーバからのアクセストークンと安全に通信して検証し、アクセストークンに 関連付けられたユーザーロールに基づいてSnowflakeへのアクセスをユーザに提供する。 create security integration external_oauth_custom type = external_oauth enabled = true external_oauth_type = custom external_oauth_issuer = \'\' external_oauth_rsa_public_key = \'\' external_oauth_audience_list = (\'\', \'\') external_oauth_token_user_mapping_claim = \'upn\' external_oauth_snowflake_user_mapping_attribute = \'login_name\'; それぞれの内容は下表の通り。 パラメータ 説明 EXTERNAL_OAUTH_ISSUER 外部認証サーバー(IdP)を一意に識別するURL(発行元URL)を指定する。IdPから発行されるアクセストークン(JWT)の iss クレームの値と完全に一致する必要がある。 EXTERNAL_OAUTH_JWS_KEYS_URL 外部認証サーバーが公開している、デジタル署名の検証に必要な公開鍵(JWKS)が配置されたURLを指定する。SnowflakeはこのURLにアクセスしてトークンの妥当性を検証する。 EXTERNAL_OAUTH_TOKEN_USER_MAPPING_CLAIM 外部認証サーバーが発行するアクセストークン(JWT)の中で、ユーザーの識別情報(メールアドレスやユーザーIDなど)が格納されている「キー(クレーム名)」を指定する。 EXTERNAL_OAUTH_SNOWFLAKE_USER_MAPPING_ATTRIBUTE トークンから抽出したユーザー識別情報を、Snowflake側の USER オブジェクトのどの属性(EMAIL_ADDRESS または LOGIN_NAME)と一致させるかを指定する。 カスタム認証サーバーの構成・テスト 公式では、最短パスで構成を検証するため、curl で HTTP Post を送る手順が書かれている。 IdP側にテストユーザを作成しておく。テストユーザはパスワードを持つ必要がある Snowflake側にも、上記と同じメールアドレス(または識別子)を持つ USER オブジェクトを事前に作っておく。login_name, または emailでマッピングする IdP側の画面でこのテスト用のアカウントを作成し、専用のClientID, ClinetSecretを取得する 次のように、 OAuth 2.0クライアントがカスタムトークンエンドポイントに POST リクエストすることを許可 OAuth 2.0の用語でいう grant_type = password(Resource Owner Password Credentials Grant)方式を使うこと。すなわち「リソース所有者に設定された付与タイプ」であり、アプリ画面を介さず、ユーザーのID/PWを直接リクエストに含めてトークンを即時発行してもらう、テスト専用の最短ルートを構築する。 準備で用意したclientID と clientSecretをHTTP Basic認証ヘッダーに含めること リクエストのBody(送信データ)には、FORM形式(application/x-www-form-urlencoded)で、テストユーザーのID/PWと、Snowflakeに渡したいスコープを指定すること curl -X POST -H \"Content-Type: application/x-www-form-urlencoded;charset=UTF-8\" --user : --data-urlencode \"username=\" --data-urlencode \"password=\" --data-urlencode \"grant_type=password\" --data-urlencode \"scope=session:role:analyst\" 公式対応認証サーバーと非公式(カスタム対応)の違い 公式対応認証サーバーと、非公式(カスタム対応)の違いをまとめてみる。 ケース1:IdPの「署名用公開鍵」がローテーション(変更)されたとき JWT(トークン)が偽造されていないかを証明するための「公開鍵」は、 セキュリティ担保のために数ヶ月ごとに自動で新しいものにローテーションするのが一般的。 公式対応の場合、SnowflakeがOkta側の鍵更新スケジュールや新しい公開鍵の 取得先をあらかじめ知っているため、Snowflake側が自動で追従する。 開発者は何のアクションも起こす必要はなく、システムは止まらない。 カスタム、つまり非公式の場合であっても基本的には指定したURL (.well-known/jwks.json)を見に行ってくれるので自動追従するが、 もしIdP側のメジャーアップデート等で「公開鍵を配置するURLの仕様そのもの」 が変わった場合は、Snowflakeの設定パラメータ(EXTERNAL_OAUTH_JWS_KEYS_URL) を開発者が手動で新しいURLに書き換えるまで、認証・認可がすべてエラーになってシステムが停止する。 IdP側のセキュリティ仕様やエンドポイントの仕様が変更されたとき 近年、サイバー攻撃の高度化に伴い、IdP側(OktaやMicrosoftなど)がトークンの発行ルールや、 検証用APIの仕様(プロトコル)をより安全なものへ強制アップデートすることがある。 SnowflakeはOktaやMicrosoftと強固な技術パートナーシップを結んでいるため、 IdP側の仕様変更がリリースされる前に、Snowflake側の「特急レーン(専用プログラム)」を 事前にアップデートして追従させる。そのため、開発者がコードや設定を修正することなく、 シームレスに新しいセキュリティ基準へ移行できる。 カスタム、つまり非マネージドの場合、Snowflakeは「汎用的なOAuth 2.0の標準規格(RFC)」に 準拠していることしか保証しない。そのため、IdP側が独自のセキュリティ拡張を行ったり、 標準規格の解釈を変更したりした場合、トークンのペイロード構造(キー・バリュー)が変わり、 Snowflakeがトークンを解読できなくなるリスクがある。 この場合、開発者がIdP側の設定を手動で修正して追従する必要がある。 まとめ SnowflakeにおけるExternal OAuth統合の仕組みを「認証」と「認可」のレイヤを分離して読んでみた。 認証・認可を完全にIdPに移譲し、Snowflakeアプリケーション内で一切の認可コードを書かずに済む。 数あるIdPのうち、いくつかについてはSnowflakeが公式対応している。 公式IdP構成はテクノロジーパートナーシップに基づき、Snowflakeのマネージド構成の一部として、 Snowflake側がIdP側の変更に自動追従する可能性が高い。結果としてダウンタイムの発生を回避できる。 公式対応IdPでなくても、OAuth2.0 RFC準拠の認証サーバとしてカスタム連携することができるが、 SnowflakeがIdP側の変更に自動追従する性質ではなく、運用者・開発者がIdP側の変更に適用する必要がある。

default eye-catch image.

Streamlit in Snowflakeの開発環境を整備して初めてのアプリケーションを実装した話

はじめに Streamlit in Snowflakeの開発を開始するには、Snowflakeアカウント、適切なIDE設定、ローカル開発環境の構築といった複数のステップが必要。この記事では、前提条件の確認、アプリケーション実装といった標準的なセットアップ手順をまとめる。 前提条件と必須の準備作業 Streamlit in Snowflakeの開発を始める前に、複数の前提条件を満たす必要がある。 前提条件の詳細: Snowflakeアカウントへのアクセス - 有効なSnowflakeアカウントと、CREATE APPLICATION PACKAGE 権限を持つロールが必須である。ロール設計を行い、この権限を付与したカスタムロールを使用する Pythonの開発環境 - Python 3.8以上がインストールされており、pipやcondaといったパッケージ管理ツールが動作する状態が前提である。Streamlit in SnowflakeはPython 3.10以上での動作を推奨している Snowparkライブラリ - ローカル開発環境にsnowpark、snowflake-snowpark-python といったパッケージをインストール済みであることが必須 Snowflake CLIツール - Snowflake提供の公式CLIツール(snow)をシステムに導入する必要がある。このツールを通じてSnowflakeを対話的に操作する 認証情報の管理 - ローカル開発では、Snowflakeへの接続情報をコードに埋め込まないことが重要である。環境変数、~/.snowsql/config ファイル、またはキーペア認証を使用して管理する。本番環境へのデプロイ時には、AWS Secrets Manager、Azure Key Vault、HashiCorp Vaultといった外部認証サービスの利用が推奨される IDE統合と開発環境の構築 Visual Studio Codeの統合により、ローカル開発フェーズ全体をエディタ内で完結させられる。Pythonコード編集、ローカルテスト実行(`streamlit run` での動作確認)、Snowflakeへのクエリ検証、デプロイまでの準備がVS Code内で実現される。一方、デプロイ後の本番環境ではSnowflakeウェブコンソール内でアプリケーションが動作する。公式のSnowflake拡張機能を利用することで、Snowflakeへの接続管理、SQL文の実行、デバッグが統一されたインターフェース内で実現される。 IDE統合のセットアップ手順: Visual Studio CodeにSnowflake拡張機能をインストールする。拡張機能マーケットプレイスから「Snowflake」を検索し、公式のSnowflake Inc.提供版をインストールする 拡張機能をインストール後、接続設定ファイル(通常は~/.snowsql/config)を確認し、接続情報が正確に記述されていることを検証する コマンドパレット(Ctrl+Shift+P または Cmd+Shift+P)からSnowflakeの接続を確立する。接続テストが成功することで、Snowflakeへの通信が確認される SQLエディタを開き、簡単なクエリ(例:SELECT CURRENT_USER())を実行してSnowflakeとの疎通確認を行う IDE統合によって、ローカルでのPythonコード編集とSnowflakeのデータ参照が同一画面で実現され、開発の効率が劇的に向上する。 GitHub Codespacesでの開発も可能: ローカルマシンの環境管理を避けたい場合、GitHub Codespacesを使用した開発も実用的に使用できる。CodespacesにおいてSnowflake拡張機能、Snowflake CLI、Streamlit CLIがサポートされている。`streamlit run` コマンドで起動したアプリケーションはCodespaces内で自動的にポート転送され、ブラウザプレビューが利用可能である。環境構築を.devcontainer/devcontainer.jsonで定義すれば、チーム全体で統一された開発環境を即座に立ち上げられる。 ローカル開発環境のセットアップ ローカルマシンでStreamlit in Snowflakeアプリケーションを開発するには、複数のPythonパッケージが必要である。仮想環境の構築を通じて、プロジェクト固有の依存関係を隔離することが実務上の標準である。 開発環境の選択肢について: Docker環境を用いてローカル開発環境を構築することも技術的には可能だが、本番環境がSnowflake内の管理コンテナ上で実行されるため、ローカルのDocker環境と本番環境の構成を統一することはできない。開発環境をDockerで隔離したとしても、本番デプロイ時には別の実行環境へ移行するため、Docker化による環境共通化のメリットは限定的である。仮想環境による環境隔離で十分であり、Docker導入による複雑さの増加は費用対効果が低い。チーム規模が大きく、開発環境の統一が重要な場合のみDocker化を検討する価値がある。 # 仮想環境を作成 python3 -m venv streamlit_env # 仮想環境を有効化(macOS/Linux) source streamlit_env/bin/activate # 仮想環境を有効化(Windows) streamlit_envScriptsactivate # 必須パッケージをインストール pip install streamlit pip install snowflake-snowpark-python pip install snowflake-cli-labs # requirements.txtを作成し、プロジェクトの依存関係を記録 pip freeze > requirements.txt requirements.txtファイルの内容例: streamlit==1.28.0 snowflake-snowpark-python==1.10.0 snowflake-cli-labs==2.0.0 pandas==2.0.0 仮想環境の隔離により、異なるプロジェクト間での依存パッケージの競合を回避できる。これは本番環境へのデプロイ時にも重要であり、requirements.txtはアプリケーションと共にSnowflakeにアップロードされる。 ローカルでの初期テストと動作確認 ローカル開発環境が構築されたら、Streamlitが正常に動作するか確認する必要がある。最小限のアプリケーションコードでSnowflakeへの接続テストを行う。 # app.py import streamlit as st from snowflake.snowpark.context import get_active_session st.title(\"Streamlit in Snowflake - 初期テスト\") try: session = get_active_session() user = session.sql(\"SELECT CURRENT_USER()\").collect()[0][0] st.success(f\"Snowflakeへの接続成功。現在のユーザー: {user}\") except Exception as e: st.error(f\"接続エラー: {str(e)}\") # 簡単なデータクエリ if st.checkbox(\"テーブル一覧を表示\"): try: databases = session.sql(\"SHOW DATABASES\").collect() st.write(f\"利用可能なデータベース数: {len(databases)}\") except Exception as e: st.error(f\"クエリ実行エラー: {str(e)}\") このテストアプリケーションを実行する場合、ローカルではStreamlit CLIでの実行が可能である。 streamlit run app.py ただし、ローカルでの実行にはSnowflakeへの認証情報が必要である。環境変数で接続情報を提供する方法が一般的である。 必須の環境変数: SNOWFLAKE_ACCOUNT - Snowflakeアカウント識別子(例:xy12345.us-east-1) SNOWFLAKE_USER - ログインユーザー名 SNOWFLAKE_PASSWORD - ユーザーのパスワード(パスワード認証の場合) SNOWFLAKE_WAREHOUSE - クエリ実行用のウェアハウス名 SNOWFLAKE_DATABASE - デフォルトのデータベース名 SNOWFLAKE_SCHEMA - デフォルトのスキーマ名 キーペア認証を用いる場合は、SNOWFLAKE_PASSWORD の代わりに SNOWFLAKE_PRIVATE_KEY_PATH と SNOWFLAKE_PRIVATE_KEY_PASSPHRASE を設定する。環境変数の設定例: export SNOWFLAKE_ACCOUNT=\"xy12345.us-east-1\" export SNOWFLAKE_USER=\"developer_user\" export SNOWFLAKE_PASSWORD=\"your_secure_password\" export SNOWFLAKE_WAREHOUSE=\"dev_warehouse\" export SNOWFLAKE_DATABASE=\"analytics_db\" export SNOWFLAKE_SCHEMA=\"dev_schema\" # その後、streamlit run app.py を実行 streamlit run app.py 別の方法として、~/.snowsql/config ファイルに接続情報を記述し、Snowpark が自動的に読み込む設定も可能である。 初めてのアプリケーション実装 前提条件とローカル環境が整備されたら、Snowflakeアカウント内に実際のアプリケーションを作成する準備が整う。最小限のアプリケーションを実装し、Snowflakeへのデプロイが正常に機能することを確認する。 最小限のアプリケーション実装: # app.py import streamlit as st st.title(\"初めてのアプリケーション\") st.write(\"Hello World.\") このシンプルな実装で、Streamlit in Snowflakeへのデプロイが正常に完了し、本番環境でUIが表示されることを確認できる。Snowflake側でアプリケーション作成用のステージとメタデータを準備する必要がある。 -- Snowflakeで実行:アプリケーション用のステージを作成 CREATE STAGE IF NOT EXISTS app_stage; -- アプリケーション設定ファイルを準備 -- manifest.ymlを作成してステージにアップロード Snowflake CLIを使用して、ローカルのアプリケーションコードをSnowflakeにデプロイする。 # Snowflake CLIでプロジェクトを初期化 snow project init # ローカルの開発コードをSnowflakeにデプロイ snow app deploy デプロイ後の検証: Snowflakeウェブコンソールにログインし、アプリケーション一覧から新規作成したアプリケーションが表示されていることを確認する アプリケーションをクリックして開き、UIが正常に表示され、Snowflakeへのクエリが実行される状態を確認する デプロイ後の最初の実行はコールドスタートのため、数秒の遅延が発生するが、以後のアクセスは高速化される 以下はエラーハンドリングを組み込んだ実装例。Snowflake環境において発生したネットワークエラー、タイムアウト、権限不足といった例外を補足し表示してみた。 # エラーハンドリングを含む実装例 import streamlit as st from snowflake.snowpark.context import get_active_session st.set_page_config(page_title=\"データダッシュボード\", layout=\"wide\") try: session = get_active_session() st.header(\"データ参照アプリケーション\") # ユーザー情報の取得 current_user = session.sql(\"SELECT CURRENT_USER()\").collect()[0][0] st.sidebar.write(f\"ユーザー: {current_user}\") # データベース選択 db_list = session.sql(\"SHOW DATABASES\").collect() databases = [row[1] for row in db_list] selected_db = st.selectbox(\"データベースを選択\", databases) st.success(f\"接続完了\") except Exception as e: st.error(f\"エラーが発生しました: {type(e).__name__}\") st.info(\"管理者に連絡してください\") まとめ 本記事では、前提条件の確認、IDE統合(Visual Studio Code、Snowflake拡張機能のセットアップ)、GitHub Codespacesでの開発環境構築の検討、ローカル開発環境の準備(仮想環境、パッケージインストール、Docker化の考慮を含む)について言及した。また、ローカルテスト実行時の環境変数設定方法についてまとめた。最後に、最小限のアプリケーションを実装し、Snowflakeへデプロイ後、動作確認を行なった。

default eye-catch image.

Streamlit in Snowflakeにおける分離コンテナ環境とセッション管理の仕組みを理解した話

はじめに Streamlit in Snowflakeで本番環境のアプリケーションを構築する際、実行環境とセッション管理の仕組みを理解することは必須である。標準的なStreamlitとは異なり、Snowflake統合版はSnowflakeの管理するコンテナ内で実行され、アプリケーションのライフサイクル、パフォーマンス特性、状態管理が大きく異なる。本稿では、この実行モデルの核心部分に焦点を当て、本番環境での実装判断に必要な知識を整理する。標準的なStreamlitの開発経験がある技術者であっても、Snowflake統合版の独特なアーキテクチャを把握することで、より堅牢で効率的なアプリケーション設計が可能となる。 Snowflakeの管理するコンテナ内での実行 Streamlit in Snowflakeのアプリケーションは、Snowflakeのアカウント内で管理された隔離されたコンテナプロセス上で実行される。ローカルマシンのPythonプロセスのように直接制御することはなく、Snowflakeのインフラストラクチャが実行環境全体を統制する。 実行環境の核心的な特性: 各アプリケーションはSnowflakeのアカウント領域内で独立した仮想環境として分離されており、他のテナントや他のアプリケーションとの干渉を受けない アプリケーションの起動、実行、終了はSnowflakeの制御下にあり、ユーザーのアクセスパターンに応じた動的なスケーリングが自動的に実行される Pythonランタイムは事前にコンテナ内にプリロードされており、ユーザーがアプリケーションにアクセスした時点でコードの実行が即座に開始される コンテナはステートレスな設計であり、複数のユーザーセッション間でローカルのファイルシステム上の状態は保持されない メモリ、CPU、ネットワーク帯域幅などのリソースは制限されており、無限に大規模なデータセットをメモリに展開することはできない この設計により、スケーラビリティと管理負荷の削減が実現される。開発者はインフラストラクチャの保守運用から解放され、アプリケーション本体の開発に集中できる。一方で、アプリケーション開発者は「各セッションは独立している」「ローカル状態は永続しない」という前提でコーディングする必要があり、この認識がなければ本番環境で予期しない動作が発生する可能性がある。 ExecutionContextとSnowflakeのセッション情報へのアクセス Streamlit in Snowflakeで最も重要な概念がExecutionContextである。これはSnowflakeのセッション情報とアプリケーション実行の状態を統合したオブジェクトであり、アプリケーションコード内から直接アクセスすることが可能である。 ExecutionContextを通じて、認証済みユーザーの識別子、割り当てられたウェアハウス、セッションのロール情報、現在のデータベースとスキーマといった情報が取得できる。これらの情報はSnowflakeの権限管理体系と一体化しており、アプリケーションが実行するすべてのSQL文はこのコンテキストの権限に基づいて検証される。 from snowflake.snowpark.context import get_active_session session = get_active_session() # 現在のユーザーを取得 current_user = session.sql(\"SELECT CURRENT_USER()\").collect()[0][0] # 割り当てられたウェアハウスを確認 current_warehouse = session.sql(\"SELECT CURRENT_WAREHOUSE()\").collect()[0][0] # 現在のロール情報 current_role = session.sql(\"SELECT CURRENT_ROLE()\").collect()[0][0] # アプリケーション領域のスキーマを取得 current_schema = session.sql(\"SELECT CURRENT_SCHEMA()\").collect()[0][0] ExecutionContextから取得可能な情報の実用的な用途: 認証済みユーザーID:このユーザーが属するテナント、部門、権限レベルをデータベースから検索し、表示内容を動的に制御する基盤となる 割り当てられたウェアハウス:クエリの実行リソースがどのウェアハウスに割り当てられるかを把握し、多くの重い処理が実行される時間帯を避けるといった最適化判断に活用される セッションのロール情報:ロールベースアクセス制御の実装において、現在のユーザーが実行可能な操作を制限する際に利用される 現在のデータベースおよびスキーマ:アプリケーションが参照するテーブルやストアドプロシージャの名前空間を把握し、正確なクエリを構築する際に用いられる ExecutionContextはSnowflakeの行レベルセキュリティ(RLS)および動的データマスキング(DDM)と組み合わせることで、マルチテナント環境でのデータ分離を実装できる。ユーザーが属するテナント情報をExecutionContextから抽出し、その情報をSQLクエリに動的にフィルタリング条件として付与するパターンが一般的である。 セッション状態の管理と永続化戦略 Streamlit in Snowflakeでは、標準的なStreamlitの`st.session_state`メカニズムが使用される。ただし、その永続性と可用性については、通常のStreamlitと異なる考慮が必要である。 セッション状態の保持期間と動作: ユーザーがブラウザを閉じるまで、またはセッションのタイムアウト(デフォルト約60分)が発生するまで、`st.session_state`に格納されたPythonオブジェクトは保持される セッション終了後、メモリ上の状態は完全に消失し、その後のユーザーアクセスでは初期化された状態から再出発する 複数のユーザーセッションが並行して実行される場合、各セッションのメモリ空間は完全に独立しており、相互干渉は発生しない 分散環境ではコンテナのリバランシングが発生する可能性があり、メモリ内状態への依存度が高いと予期しない状態喪失が発生する危険性がある セッション状態を効果的に使用するパターンとしては、ユーザーの入力フォーム状態、フィルタ条件、ページネーション状態、一時的なキャッシュなど、セッション内での短期的な状態に限定することが推奨される。 import streamlit as st from snowflake.snowpark.context import get_active_session session = get_active_session() # セッション状態で一時的なUIフィルタを保持 if \'selected_date_range\' not in st.session_state: st.session_state.selected_date_range = (None, None) if \'filter_status\' not in st.session_state: st.session_state.filter_status = \'all\' # ユーザーインタラクションでセッション状態を更新 date_range = st.date_input(\"期間を選択\", st.session_state.selected_date_range) st.session_state.selected_date_range = date_range # 永続化が必要な設定はSnowflakeテーブルに明示的に保存 if st.button(\'設定を保存\'): current_user = session.sql(\"SELECT CURRENT_USER()\").collect()[0][0] session.sql(f\"\"\" UPDATE user_preferences SET ui_settings = parse_json(?) WHERE user_id = ? \"\"\", params=[str(st.session_state.filter_status), current_user]).collect() st.success(\"設定を保存しました\") セッション終了後も保持する必要があるデータ(ユーザー設定、保存された検索条件、レポート結果など)は、Snowflakeのテーブルに明示的に書き込む必要がある。この明確な分離により、アプリケーションの動作が予測可能になり、バグの温床となる隠れた状態共有が回避される。 パフォーマンス特性とコールドスタート最適化 Streamlit in Snowflakeのパフォーマンス特性は、コンテナの起動時間、リソースの割り当て、クエリの実行効率によって大きく影響を受ける。 パフォーマンスに関わる重要な指標: 初期化時間:ユーザーがアプリケーションに初めてアクセスする際、Snowflakeがコンテナを起動し、Pythonランタイムを初期化するまでに3秒から10秒程度要する場合がある。これをコールドスタートと呼ぶ SQLクエリ実行時間:SQLクエリの実行時間は主にSnowflakeのクエリプランニングと分散処理の時間に依存し、ネットワークレイテンシは相対的に最小限である メモリ制限:各コンテナプロセスのメモリは制限されており、gigabyte単位の大規模なデータセットを一度にメモリにロードすることは技術的に不可能である リソース競合:同一のウェアハウス上で複数のアプリケーションやクエリが並行実行される場合、リソース争奪による性能低下が発生する可能性がある キャッシュ効果:頻繁にアクセスされるテーブルやクエリ結果はSnowflakeの内部キャッシュに保持され、2度目以降のアクセスは高速化される 本番環境ではコールドスタート対策として、アプリケーション初期化時の処理を最小化し、必要なデータは遅延読み込みするパターンが採用される。また、複雑な分析処理やデータ変換はSnowflakeのストアドプロシージャに委譲し、アプリケーション層では結果の表示と対話的なUIの提供に専念することが効率的である。 ウェアハウスとリソース割り当ての考慮 Streamlit in Snowflakeのアプリケーションが実行するすべてのSQL文は、指定されたウェアハウスのコンピュート能力を消費する。ウェアハウスの選択は、クエリの実行速度、同時実行可能なセッション数、運用コストに大きな影響を与える。 ウェアハウス選択の実務的考慮: 小規模なウェアハウス(XSMALL、SMALL)はコストが低く、軽量なクエリや低アクセス頻度のアプリケーションに適しており、一方で大量のユーザーからの並行アクセスには不向きである 大規模なウェアハウス(LARGE、XLARGE以上)は並行クエリ処理の能力が高く、多くのユーザーからの同時アクセスに対応できるが、アイドル状態であってもコストが発生する オートスケール機能を有効にすることで、負荷に応じたウェアハウスの自動拡張が可能になり、ピーク時の対応と非ピーク時のコスト削減を両立させられる 複数のアプリケーションが同一ウェアハウスを共有する場合、負荷分散戦略を立案し、一つのアプリケーションの過度なリソース消費が他のアプリケーションに悪影響を与えないようにする必要がある リソースの効率的な利用と高いパフォーマンスの両立は、ウェアハウスのサイズ選択、クエリの最適化、適切なキャッシング戦略によって初めて実現される。 まとめ Streamlit in Snowflakeは、Snowflakeの管理するマネージドコンテナ環境内で動作し、ExecutionContextを通じてSnowflake側のセッション情報に直接アクセスできるアーキテクチャである。セッション状態は短期的なUI状態の保持に限定し、永続化が必要なデータはSnowflakeテーブルに委譲することが設計の原則である。また、コールドスタートやリソース共有の課題を念頭に置いて、初期化処理の最小化とクエリの最適化によるパフォーマンス改善アプローチを検討する必要がある。これらの理解があれば、本番環境での実装判断が格段に容易になり、堅牢で拡張性の高いアプリケーション設計が可能となる。

default eye-catch image.

Snowflake MCPサーバを試してみた

何周遅れか分からないが、Snowflake MCPサーバを試してみたのでアウトプットしてみる。 AI AgentはClaude Code。MCPの構築と接続設定自体をClaude Codeで行なった。 この記事で使用したMCPサーバは以下。いわゆる野良MCPサーバ。 [clink implicit=\"false\" url=\"https://github.com/isaacwasserman/mcp-snowflake-server\" imgurl=\"https://camo.githubusercontent.com/bdcfca988b369e51051c3201cedfc429354b0801a0c5d88aa3eb00ae37e7188b/68747470733a2f2f6d736565702e6e65742f70722f69736161637761737365726d616e2d6d63702d736e6f77666c616b652d7365727665722d62616467652e706e67\" title=\"Snowflake MCP Server\" excerpt=\"A Model Context Protocol (MCP) server implementation that provides database interaction with Snowflake. This server enables running SQL queries via tools and exposes data insights and schema context as resources.\"] [arst_toc tag=\"h4\"] 前提となる環境 Macにnode、uv、Claude Codeを導入済み。 # 諸々のバージョンは以下 $ sw_vers ProductName: macOS ProductVersion: 15.6 BuildVersion: 24G84 # nodeは導入済み $ node --version v24.4.1 # uvは導入済み $ uv --version 0.8.13 (ede75fe62 2025-08-21)0.8.13 (ede75fe62 2025-08-21) # Claude Codeは導入済み $ claude --version 1.0.89 (Claude Code) # 検証用ディレクトリの作成と移動。以降ここで検証を実施。 $ mkdir snowflake-mcp-server && cd $_ 環境構築 プロンプトとその回答は省略する。要件を伝え環境構築を実施してもらった。 Claudeに対するMCPの設定方法としてグローバルに設定する方法と、 プロジェクト個別に設定する方法があるが、当初グローバル設定の方法を案内された。 2回目のプロンプトでプロジェクト個別設定に変更してもらった。 最終的なファイル構成は以下の通り。 $ tree . . ├── mcp-snowflake-server │   ├── Dockerfile │   ├── example_connections.toml │   ├── LICENSE │   ├── pyproject.toml │   ├── README.md │   ├── runtime_config.json │   ├── src │   │   └── mcp_snowflake_server │   │   ├── __init__.py │   │   ├── db_client.py │   │   ├── serialization.py │   │   ├── server.py │   │   └── write_detector.py │   └── uv.lock ├── PROJECT_SPECIFIC_SETUP.md ├── SETUP.md ├── snowflake_connections.toml └── test_server.py snowflake_connections.toml に Snowflakeの接続設定を書くだけで終了。 試してみた claudeを起動すると .mcp.json が認識された。 $ claude ╭─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮ │ │ │ New MCP server found in .mcp.json: snowflake │ │ │ │ MCP servers may execute code or access system resources. All tool calls require approval. Learn more in the MCP documentation │ │ (​https://docs.anthropic.com/s/claude-code-mcp​). │ │ │ │ ❯ 1. Use this and all future MCP servers in this project │ │ 2. Use this MCP server │ │ 3. Continue without using this MCP server │ │ │ ╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ > Snowflakeのデータベース `sample_db` におけるスキーマの一覧を教えてください。 ⏺ snowflake - list_schemas (MCP)(database: \"sample_db\") ⎿  type: data data_id: e40f8385-e3c0-4848-aa36-c1b8fada788f database: sample_db … +8 lines (ctrl+r to expand) ⎿  [Resource from snowflake at data://e40f8385-e3c0-4848-aa36-c1b8fada788f] { \"type\": \"data\", \"data_id\": \"e40f8385-e3c0-4848-aa36-c1b8fada788f\", … +25 lines (ctrl+r to expand) ⏺ sample_db データベースには以下の7つのスキーマがあります: - CLEANSED - INTERMEDIATE - PUBLIC - SAMPLE_SCHEMA > cleansedスキーマ内のテーブルの一覧を教えてください。 ⏺ CLEANSED スキーマには以下の19のテーブルがあります: ... メタ情報(description) dbtでモデルを作る際にdescriptionを付与することで、テーブル、カラムにdescriptionを付与できる。 その先、セマンティックレイヤーの存在がチラつくが、descriptionだけでもそこそこいける。 まとめ Claude Codeにローカル起動型(Dockerfile)のSnowflake MCPサーバを導入してみた。 その際、Claude Code自身にMCPサーバの導入を依頼し、ほぼノーコードで導入できることを確認した。 テーブル、カラムにdescriptionを付与しておくと、MCPサーバを通じて取得・消費される。 クエリによって得られる知識がClaudeに蓄積され、知識の文脈で会話ができるようになることを確認した。

default eye-catch image.

Fellegi-Sunterモデルに基づく確率的名寄せパッケージ Splinkを試してみる

[mathjax] Record Linkage、Entity Recognitionなど、いわゆる「名寄せ」問題には、割とさまざまな解決策が 提案されている。その1つに確率論的な数学的背景を持つFellegi-Sunterモデルを実装したSplinkがある。 数学的背景について咀嚼して説明してくれているサイトはあまりなく、Splinkの公式ドキュメントが唯一 に近いソースだろうと思う。 今回は5万人の人物データを使って重複を除去するSplinkの主要な機能のサンプルを追ってみた。 数式を読んだところで実りは薄い、ということもあり、なるべく数式を読まずに読み進めたが、 モデルのパラメタ、予測結果を解釈する際にどうしても数式を読まないとわからない箇所があった。 特にベイズ統計について理解がかなり浅くて、正直なところ理解しきれてはいない。 Splinkの公式ドキュメントを読み進めていった際に気付いたメモを書いてみることにした。 [arst_toc tag=\"h4\"] 環境構築・準備 環境構築の実装部分は主な学習要素ではないのでClaude Codeでサクッと作ってみた。 まずはPlan modeで計画を立ててもらう。これぐらいだとほぼ言い値で使える。 DuckDB、Splinkを使う最小構成の環境を作ります。環境はDocker composeで構築します。計画を立ててください。 ⏺ DuckDB、Splinkを使うDocker Compose環境の計画を完了しました。 計画概要 1. 設計文書: .tmp/design.mdで要件、アーキテクチャ、技術仕様を定義 2. タスク管理: .tmp/task.mdで詳細な実装手順を3フェーズに分けて整理 主要コンポーネント - Jupyter Lab環境: DuckDB、Splink、必要なライブラリがプリインストール - データボリューム: サンプルデータ、notebook、出力用のディレクトリ - 最小構成: 開発・学習用途に特化した軽量環境 次のステップ 実装を開始する場合は、Phase 1から順番に進めます: 1. Docker設定ファイル作成 2. ディレクトリ構造構築 3. サンプル実装 4. テストと検証 計画の詳細は.tmp/design.mdと.tmp/task.mdをご確認ください。実装を開始しますか? 実際にファイルを作ってもらうと以下のような構成となる。 得られる中間成果物である docker-compose.yml、Dockerfile は本記事では省略する。 . ├── data ├── docker-compose.yml ├── Dockerfile ├── notebooks ├── output └── requirements.txt 普通に http://localhost:8888 で JupyterLab が開く。 使用するサンプルデータ 5万人の人物データを使って名寄せを行うサンプル。おそらくSplinkの用途として最初に思いつくやつ。 Splinkにデータをロードする前に必要なデータクリーニング手順について説明がある。 公式によると、まずは行に一意のIDを割り当てる必要がある。 データセット内で一意となるIDであって、重複除去した後のエンティティを識別するIDのことではない。 [clink implicit=\"false\" url=\"https://moj-analytical-services.github.io/splink/demos/tutorials/01_Prerequisites.html\" imgurl=\"https://user-images.githubusercontent.com/7570107/85285114-3969ac00-b488-11ea-88ff-5fca1b34af1f.png\" title=\"Data Prerequisites\" excerpt=\"Splink では、リンクする前にデータをクリーンアップし、行に一意の ID を割り当てる必要があります。このセクションでは、Splink にデータをロードする前に必要な追加のデータクリーニング手順について説明します。\"] 使用するサンプルデータは以下の通り。 from splink import splink_datasets df = splink_datasets.historical_50k df.head() データの分布を可視化 splink.exploratoryのprofile_columnsを使って分布を可視化してみる。 from splink import DuckDBAPI from splink.exploratory import profile_columns db_api = DuckDBAPI() profile_columns(df, db_api, column_expressions=[\"first_name\", \"substr(surname,1,2)\"]) 同じ姓・名の人が大量にいることがわかる。 ブロッキングとブロッキングルールの評価 テーブル内のレコードが他のレコードと「同一かどうか」を調べるためには、 基本的には、他のすべてのレコードとの何らかの比較操作を行うこととなる。 全てのレコードについて全てのカラム同士を比較したいのなら、 対象のテーブルをCROSS JOINした結果、各カラム同士を比較することとなる。 SELECT ... FROM input_tables as l CROSS JOIN input_tables as r あるカラムが条件に合わなければ、もうその先は見ても意味がない、 というケースは多い。例えば、まず first_name 、surname が同じでなければ、 その先の比較を行わない、というのはあり得る。 SELECT ... FROM input_tables as l INNER JOIN input_tables as r ON l.first_name = r.first_name AND l.surname = r.surname このような考え方をブロッキング、ON句の条件をブロッキングルールと言う。 ただ、これだと性と名が完全一致していないレコードが残らない。 そこで、ブロッキングルールを複数定義し、いずれかが真であれば残すことができる。 ここでポイントなのが、ブロッキングルールを複数定義したとき、 それぞれのブロッキングルールで重複して選ばれるレコードが発生した場合、 Splinkが自動的に排除してくれる。 このため、ブロッキングルールを重ねがけすると、最終的に残るレコード数は一致する。 ただ、順番により、同じルールで残るレコード数は変化する。 逆に言うと、ブロッキングルールを足すことで、重複除去後のOR条件が増えていく。 積算グラフにして、ブロッキングルールとその順番の効果を見ることができる。 from splink import DuckDBAPI, block_on from splink.blocking_analysis import ( cumulative_comparisons_to_be_scored_from_blocking_rules_chart, ) blocking_rules = [ block_on(\"substr(first_name,1,3)\", \"substr(surname,1,4)\"), block_on(\"surname\", \"dob\"), block_on(\"first_name\", \"dob\"), block_on(\"postcode_fake\", \"first_name\"), block_on(\"postcode_fake\", \"surname\"), block_on(\"dob\", \"birth_place\"), block_on(\"substr(postcode_fake,1,3)\", \"dob\"), block_on(\"substr(postcode_fake,1,3)\", \"first_name\"), block_on(\"substr(postcode_fake,1,3)\", \"surname\"), block_on(\"substr(first_name,1,2)\", \"substr(surname,1,2)\", \"substr(dob,1,4)\"), ] db_api = DuckDBAPI() cumulative_comparisons_to_be_scored_from_blocking_rules_chart( table_or_tables=df, blocking_rules=blocking_rules, db_api=db_api, link_type=\"dedupe_only\", ) 積算グラフは以下の通り。積み上がっている数値は「比較の数」。 要は、論理和で条件を足していって、次第に緩和されている様子がわかる。 DuckDBでは比較の数を2,000万件以内、Athena,Sparkでは1億件以内を目安にせよとのこと。 比較の定義 Splinkは Fellegi-Sunter model モデル (というかフレームワーク) に基づいている。 https://moj-analytical-services.github.io/splink/topic_guides/theory/fellegi_sunter.html 各カラムの同士をカラムの特性に応じた距離を使って比較し、重みを計算していく。 各カラムの比較に使うためのメソッドが予め用意されているので、特性に応じて選んでいく。 以下では、first_name, sur_name に ForenameSurnameComparison が使われている。 dobにDateOfBirthComparison、birth_place、ocupationにExactMatchが使われている。 import splink.comparison_library as cl from splink import Linker, SettingsCreator settings = SettingsCreator( link_type=\"dedupe_only\", blocking_rules_to_generate_predictions=blocking_rules, comparisons=[ cl.ForenameSurnameComparison( \"first_name\", \"surname\", forename_surname_concat_col_name=\"first_name_surname_concat\", ), cl.DateOfBirthComparison( \"dob\", input_is_string=True ), cl.PostcodeComparison(\"postcode_fake\"), cl.ExactMatch(\"birth_place\").configure(term_frequency_adjustments=True), cl.ExactMatch(\"occupation\").configure(term_frequency_adjustments=True), ], retain_intermediate_calculation_columns=True, ) # Needed to apply term frequencies to first+surname comparison df[\"first_name_surname_concat\"] = df[\"first_name\"] + \" \" + df[\"surname\"] linker = Linker(df, settings, db_api=db_api) ComparisonとComparison Level ここでSplinkツール内の比較の概念の説明。以下の通り概念に名前がついている。 Data Linking Model ├─-- Comparison: Date of birth │ ├─-- ComparisonLevel: Exact match │ ├─-- ComparisonLevel: One character difference │ ├─-- ComparisonLevel: All other ├─-- Comparison: First name │ ├─-- ComparisonLevel: Exact match on first_name │ ├─-- ComparisonLevel: first_names have JaroWinklerSimilarity > 0.95 │ ├─-- ComparisonLevel: first_names have JaroWinklerSimilarity > 0.8 │ ├─-- ComparisonLevel: All other モデルのパラメタ推定 モデルの実行に必要なパラメタは以下の3つ。Splinkを用いてパラメタを得る。 ちなみに u は \"\'U\'nmatch\"、m は \"\'M\'atch\"。背後の数式の説明で現れる。 No パラメタ 説明 1 無作為に選んだレコードが一致する確率 入力データからランダムに取得した2つのレコードが一致する確率 (通常は非常に小さい数値) 2 u値(u確率) 実際には一致しないレコードの中で各 ComparisonLevel に該当するレコードの割合。具体的には、レコード同士が同じエンティティを表すにも関わらず値が異なる確率。例えば、同じ人なのにレコードによって生年月日が違う確率。これは端的には「データ品質」を表す。名前であればタイプミス、別名、ニックネーム、ミドルネーム、結婚後の姓など。 3 m値(m確率) 実際に一致するレコードの中で各 ComparisonLevel に該当するレコードの割合。具体的には、レコード同士が異なるエンティティを表すにも関わらず値が同じである確率。例えば別人なのにレコードによって性・名が同じ確率 (同姓同名)。性別は男か女かしかないので別人でも50%の確率で一致してしまう。 無作為に選んだレコードが一致する確率 入力データからランダムに抽出した2つのレコードが一致する確率を求める。 値は0.000136。すべての可能なレコードのペア比較のうち7,362.31組に1組が一致すると予想される。 合計1,279,041,753組の比較が可能なため、一致するペアは合計で約173,728.33組になると予想される、 とのこと。 linker.training.estimate_probability_two_random_records_match( [ block_on(\"first_name\", \"surname\", \"dob\"), block_on(\"substr(first_name,1,2)\", \"surname\", \"substr(postcode_fake,1,2)\"), block_on(\"dob\", \"postcode_fake\"), ], recall=0.6, ) > Probability two random records match is estimated to be 0.000136. > This means that amongst all possible pairwise record comparisons, > one in 7,362.31 are expected to match. > With 1,279,041,753 total possible comparisons, > we expect a total of around 173,728.33 matching pairs u確率の推定 実際には一致しないレコードの中でComparisonの評価結果がPositiveである確率。 基本、無作為に抽出したレコードは一致しないため、「無作為に抽出したレコード」を 「実際には一致しないレコード」として扱える、という点がミソ。 probability_two_random_records_match によって得られた値を使ってu確率を求める。 estimate_u_using_random_sampling によって、ラベルなし、つまり教師なしでu確率を得られる。 レコードのペアをランダムでサンプルして上で定義したComparisonを評価する。 ランダムサンプルなので大量の不一致が発生するが、各Comparisonにおける不一致の分布を得ている。 これは、例えば性別について、50%が一致、50%が不一致である、という分布を得ている。 一方、例えば生年月日について、一致する確率は 1%、1 文字の違いがある確率は 3%、 その他はすべて 96% の確率で発生する、という分布を得ている。 linker.training.estimate_u_using_random_sampling(max_pairs=5e6) > ----- Estimating u probabilities using random sampling ----- > > Estimated u probabilities using random sampling > > Your model is not yet fully trained. Missing estimates for: > - first_name_surname (no m values are trained). > - dob (no m values are trained). > - postcode_fake (no m values are trained). > - birth_place (no m values are trained). > - occupation (no m values are trained). m確率の推定 「実際に一致するレコード」の中で、Comparisonの評価がNegativeになる確率。 そもそも、このモデルを使って名寄せ、つまり「一致するレコード」を見つけたいのだから、 モデルを作るために「実際に一致するレコード」を計算しなければならないのは矛盾では..となる。 無作為抽出結果から求められるu確率とは異なり、m確率を求めるのは難しい。 もしラベル付けされた「一致するレコード」、つまり教師データセットがあるのであれば、 そのデータセットを使ってm確率を求められる。 例えば、日本人全員にマイナンバーが振られて、全てのレコードにマイナンバーが振られている、 というアナザーワールドがあるのであれば、マイナンバーを使ってm確率を推定する。(どういう状況??) ラベル付けされたデータがないのであれば、EMアルゴリズムでm確率を求めることになっている。 EMアルゴリズムは反復的な手法で、メモリや収束速度の点でペア数を減らす必要があり、 例ではブロッキングルールを設定している。 以下のケースでは、first_nameとsurnameをブロッキングルールとしている。 つまり、first_name, surnameが完全に一致するレコードについてペア比較を行う。 この仮定を設定したため、first_name, surname (first_name_surname) のパラメタを推定できない。 training_blocking_rule = block_on(\"first_name\", \"surname\") training_session_names = ( linker.training.estimate_parameters_using_expectation_maximisation( training_blocking_rule, estimate_without_term_frequencies=True ) ) > ----- Starting EM training session ----- > > Estimating the m probabilities of the model by blocking on: > (l.\"first_name\" = r.\"first_name\") AND (l.\"surname\" = r.\"surname\") > > Parameter estimates will be made for the following comparison(s): > - dob > - postcode_fake > - birth_place > - occupation > > Parameter estimates cannot be made for the following comparison(s) since they are used in the blocking rules: > - first_name_surname > > Iteration 1: Largest change in params was 0.248 in probability_two_random_records_match > Iteration 2: Largest change in params was 0.0929 in probability_two_random_records_match > Iteration 3: Largest change in params was -0.0237 in the m_probability of birth_place, level `Exact match on > birth_place` > Iteration 4: Largest change in params was 0.00961 in the m_probability of birth_place, level `All other >comparisons` > Iteration 5: Largest change in params was -0.00457 in the m_probability of birth_place, level `Exact match on birth_place` > Iteration 6: Largest change in params was -0.00256 in the m_probability of birth_place, level `Exact match on birth_place` > Iteration 7: Largest change in params was 0.00171 in the m_probability of dob, level `Abs date difference Iteration 8: Largest change in params was 0.00115 in the m_probability of dob, level `Abs date difference Iteration 9: Largest change in params was 0.000759 in the m_probability of dob, level `Abs date difference Iteration 10: Largest change in params was 0.000498 in the m_probability of dob, level `Abs date difference Iteration 11: Largest change in params was 0.000326 in the m_probability of dob, level `Abs date difference Iteration 12: Largest change in params was 0.000213 in the m_probability of dob, level `Abs date difference Iteration 13: Largest change in params was 0.000139 in the m_probability of dob, level `Abs date difference Iteration 14: Largest change in params was 9.04e-05 in the m_probability of dob, level `Abs date difference <= 10 year` 同様にdobをブロッキングルールに設定して実行すると、dob以外の列についてパラメタを推定できる。 training_blocking_rule = block_on(\"dob\") training_session_dob = ( linker.training.estimate_parameters_using_expectation_maximisation( training_blocking_rule, estimate_without_term_frequencies=True ) ) > ----- Starting EM training session ----- > > Estimating the m probabilities of the model by blocking on: > l.\"dob\" = r.\"dob\" > > Parameter estimates will be made for the following comparison(s): > - first_name_surname > - postcode_fake > - birth_place > - occupation > > Parameter estimates cannot be made for the following comparison(s) since they are used in the blocking rules: > - dob > > Iteration 1: Largest change in params was -0.474 in the m_probability of first_name_surname, level `Exact match on first_name_surname_concat` > Iteration 2: Largest change in params was 0.052 in the m_probability of first_name_surname, level `All other comparisons` > Iteration 3: Largest change in params was 0.0174 in the m_probability of first_name_surname, level `All other comparisons` > Iteration 4: Largest change in params was 0.00532 in the m_probability of first_name_surname, level `All other comparisons` > Iteration 5: Largest change in params was 0.00165 in the m_probability of first_name_surname, level `All other comparisons` > Iteration 6: Largest change in params was 0.00052 in the m_probability of first_name_surname, level `All other comparisons` > Iteration 7: Largest change in params was 0.000165 in the m_probability of first_name_surname, level `All other comparisons` > Iteration 8: Largest change in params was 5.29e-05 in the m_probability of first_name_surname, level `All other comparisons` > > EM converged after 8 iterations > > Your model is not yet fully trained. Missing estimates for: > - first_name_surname (some u values are not trained). モデルパラメタの可視化 m確率、u確率の可視化。 マッチウェイトの可視化。マッチウェイトは (log_2 (m / u))で計算される。 linker.visualisations.match_weights_chart() モデルの保存と読み込み 以下でモデルを保存できる。 settings = linker.misc.save_model_to_json( \"./saved_model_from_demo.json\", overwrite=True ) 以下で保存したモデルを読み込める。 import json settings = json.load( open(\'./saved_model_from_demo.json\', \'r\') ) リンクするのに十分な情報が含まれていないレコード 「John Smith」のみを含み、他のすべてのフィールドがnullであるレコードは、 他のレコードにリンクされている可能性もあるが、潜在的なリンクを明確にするには十分な情報がない。 以下により可視化できる。 linker.evaluation.unlinkables_chart() 横軸は「マッチウェイトの閾値」。縦軸は「リンクするのに十分な情報が含まれないレコード」の割合。 マッチウェイト閾値=6.11ぐらいのところを見ると、入力データセットのレコードの約1.3%が リンクできないことが示唆される。 訓練済みモデルを使って未知データのマッチウェイトを予測 上で構築した推定モデルを使用し、どのペア比較が一致するかを予測する。 内部的には以下を行うとのこと。 blocking_rules_to_generate_predictionsの少なくとも1つと一致するペア比較を生成 Comparisonで指定されたルールを使用して、入力データの類似性を評価 推定された一致重みを使用し、要求に応じて用語頻度調整を適用して、最終的な一致重みと一致確率スコアを生成 df_predictions = linker.inference.predict(threshold_match_probability=0.2) df_predictions.as_pandas_dataframe(limit=1) > Blocking time: 0.88 seconds > Predict time: 1.91 seconds > > -- WARNING -- > You have called predict(), but there are some parameter estimates which have neither been estimated or > specified in your settings dictionary. To produce predictions the following untrained trained parameters will > use default values. > Comparison: \'first_name_surname\': > u values not fully trained records_to_plot = df_e.to_dict(orient=\"records\") linker.visualisations.waterfall_chart(records_to_plot, filter_nulls=False) predictしたマッチウェイトの可視化、数式との照合 predictしたマッチウェイトは、ウォーターフォール図で可視化できる。 マッチウェイトは、モデル内の各特徴量によって一致の証拠がどの程度提供されるかを示す中心的な指標。 (lambda)は無作為抽出した2つのレコードが一致する確率。(K=m/u)はベイズ因子。 begin{align} M &= log_2 ( frac{lambda}{1-lambda} ) + log_2 K \\ &= log_2 ( frac{lambda}{1-lambda} ) + log_2 m - log_2 u end{align} 異なる列の比較が互いに独立しているという仮定を置いていて、 2つのレコードのベイズ係数が各列比較のベイズ係数の積として扱う。 begin{eqnarray} K_{feature} = K_{first_name_surname} + K_{dob} + K_{postcode_fake} + K_{birth_place} + K_{occupation} + cdots end{eqnarray} マッチウェイトは以下の和。 begin{eqnarray} M_{observe} = M_{prior} + M_{feature} end{eqnarray} ここで begin{align} M_{prior} &= log_2 (frac{lambda}{1-lambda}) \\ M_{feature} &= M_{first_name_surname} + M_{dob} + M_{postcode_fake} + M_{birth_place} + M_{occupation} + cdots end{align} 以下のように書き換える。 begin{align} M_{observe} &= log_2 (frac{lambda}{1-lambda}) + sum_i^{feature} log_2 (frac{m_i}{u_i}) \\ &= log_2 (frac{lambda}{1-lambda}) + log_2 (prod_i^{feature} (frac{m_i}{u_i}) ) end{align} ウォーターフォール図の一番左、赤いバーは(M_{prior} = log_2 (frac{lambda}{1-lambda}))。 特徴に関する追加の知識が考慮されていない場合のマッチウェイト。 横に並んでいる薄い緑のバーは (M_{first_name_surname} + M_{dob} + M_{postcode_fake} + M_{birth_place} + M_{occupation} + cdots)。 各特徴量のマッチウェイト。 一番右の濃い緑のバーは2つのレコードの合計マッチウェイト。 begin{align} M_{feature} &= M_{first_name_surname} + M_{dob} + M_{postcode_fake} + M_{birth_place} + M_{occupation} + cdots \\ &= 8.50w end{align} まとめ 長くなったのでいったん終了。この記事では教師なし確率的名寄せパッケージSplinkを使用してモデルを作ってみた。 次の記事では、作ったモデルを使用して実際に名寄せをしてみる。 途中、DuckDBが楽しいことに気づいたので、DuckDBだけで何個か記事にしてみようと思う。

default eye-catch image.

AirflowでEnd-To-End Pipeline Testsを行うためにAirflow APIを調べてみた話

Airflow自体にDAGの実行結果をテスト(End-To-End Pipeline Tests)する仕組みは無いようで、 以下のような地道な仕組みを自力で作る必要がありそうです。 テストデータを用意する Airflowが提供するAirflow APIを使用してDAGを実行する DAGの終了を待つ 結果をAssertする 他にAirflow CLIも使えそうですが、pythonコードの一部にするならAPIの方が使い勝手が良さそうです。 API仕様書を上から読んでみたので、その感想を書いてみます。 他にもあるのですが、今回の用途に使いそうなものを抜粋しています。 \"読んでみた\"だけなので、誤りがあるかもしれません。概要を理解するぐらいの気持ちで読んでください。 [arst_toc tag=\"h4\"] Airflow API概要 今日時点のAirflow APIのAPI仕様書は以下です。 Airflow API (Stable) (2.10.0) RESTful APIとなっていて、Resourceに対するCRUDをHTTP Methodで表現します。 1つ、update_maskという考え方があります。リソースの値を更新する際、リソースjsonと同時に クエリパラメタで\"変更したい値は何か\"を渡すことで、リソースjsonの該当値のみを更新できます。 resource = request.get(\'/resource/my-id\').json() resource[\'my_field\'] = \'new-value\' request.patch(\'/resource/my-id?update_mask=my_field\', data=json.dumps(resource)) API Authenticationがusername/passwordで雑ですが、 DAGのis_pausedをtrueにするには、以下の通りpatchを叩くようです。 curl -X PATCH \'https://example.com/api/v1/dags/{dag_id}?update_mask=is_paused\' -H \'Content-Type: application/json\' --user \"username:password\" -d \'{ \"is_paused\": true }\' CORSを有効にする必要があります。Enabling CORS 様々なAPI認証が用意されています。API認証はAirflowのauth managerで管理されます。Authentication エラーはRFC7807準拠です。つまり、Unauthenticated、PermissionDenied、BadRequest、NotFound、MethodNotAllowed、NotAcceptable、AlreadyExistsが扱われます。Errors Connections ざっとAPIを眺めていきます。 まずはConnection。順当なCRUDです。patchでupdate_maskが使われます。 コードから一通りConnectionを触れそうです。 Testって何か調べてみました。 デフォルトでdisabledになっていますが、Airflow UI(Connections)から\"Test\"ボタンを押下できます。 Connectionと関連付けられたhookのtest_connection()メソッドを実行するようです。 これと同等の機能が動くようです。 Method Endpoint Overview Response GET /connections List Connection array of objects(ConnectionCollectionItem). POST /connections Create a Connection created connection. GET /connections/{connection_id} Get a connection connection PATCH /connections/{connection_id} Update a connection updated connection DELETE /connections/{connection_id} Delete a connection (Status) POST /connections/test Test a connection (Status) DAG 次はDAG。まずDAG一覧に対する操作。一覧に対してpatchを叩ける様子です。 Method Endpoint Overview GET /dags List DAGs in the database. dag_id_pattern can be set to match dags of a specific pattern PATCH /dags Update DAGs of a given dag_id_pattern using UpdateMask. This endpoint allows specifying ~ as the dag_id_pattern to update all DAGs. New in version 2.3.0 次は個別のDAGに対する操作。 Method Endpoint Overview GET /dags/{dag_id} Get basic information about a DAG.Presents only information available in database (DAGModel). If you need detailed information, consider using GET /dags/{dag_id}/details. PATCH /dags/{dag_id} Update a DAG. DELETE /dags/{dag_id} Deletes all metadata related to the DAG, including finished DAG Runs and Tasks. Logs are not deleted. This action cannot be undone.New in version 2.2.0 GET /dags/{dag_id}/tasks/detail Get simplified representation of a task. GET /dags/{dag_id}/detail Get a simplified representation of DAG.The response contains many DAG attributes, so the response can be large. If possible, consider using GET /dags/{dag_id}. Airflowにおいて、Operatorのインスタンスに\"Task\"という用語が割り当てられています。 つまり、「Operatorに定義した処理を実際に実行すること」が\"Task\"としてモデリングされています。 「\"Task\"をA月B日X時Y分Z秒に実行すること」が、\"TaskInstance\"としてモデリングされています。 あるDAGは、実行日/実行時間ごとの複数の\"TaskInstance\"を保持しています。 以下のAPIにおいて、DAGが保持する\"Task\",\"日付レンジ\"等を指定して実行します。 \"TaskInstance\"を\"Clear(再実行)\"します。また、\"TaskInstance\"の状態を一気に更新します。 Method Endpoint Overview POST /dags/{dag_id}/clearTaskInstances Clears a set of task instances associated with the DAG for a specified date range. POST /dags/{dag_id}/updateTaskInstancesState Updates the state for multiple task instances simultaneously. GET /dags/{dag_id}/tasks Get tasks for DAG. なんだこれ、ソースコードを取得できるらしいです。 Method Endpoint Overview GET /dagSources/{file_token} Get a source code using file token. DAGRun \"Task\"と\"TaskInstance\"の関係と同様に\"DAG\"と\"DAGRun\"が関係しています。 「A月B日X時Y分Z秒のDAG実行」が\"DAGRun\"です。DAGRun。順当な感じです。 新規にトリガしたり、既存のDAGRunを取得して更新したり削除したり、再実行したりできます。 Method Endpoint Overview GET /dags/{dag_id}/dagRuns List DAG runs.This endpoint allows specifying ~ as the dag_id to retrieve DAG runs for all DAGs. POST /dags/{dag_id}/dagRuns Trigger a new DAG run.This will initiate a dagrun. If DAG is paused then dagrun state will remain queued, and the task won\'t run. POST /dags/~/dagRuns/list List DAG runs (batch).This endpoint is a POST to allow filtering across a large number of DAG IDs, where as a GET it would run in to maximum HTTP request URL length limit. GET /dags/{dag_id}/dagRuns/{dag_run_id} Get a DAG run. DELETE /dags/{dag_id}/dagRuns/{dag_run_id} Delete a DAG run. PATCH /dags/{dag_id}/dagRuns/{dag_run_id} Modify a DAG run.New in version 2.2.0 POST /dags/{dag_id}/dagRuns/{dag_run_id}/clear Clear a DAG run.New in version 2.4.0 以下はスキップ.. Method Endpoint Overview GET /dags/{dag_id}/dagRuns/{dag_run_id}/upstreamDatasetEvents Get datasets for a dag run.New in version 2.4.0 PATCH /dags/{dag_id}/dagRuns/{dag_run_id}/setNote Update the manual user note of a DagRun.New in version 2.5.0 DAGWarning DAGのimport_errors一覧を返します。 Method Endpoint Overview GET /dagWarnings List Dag Waranings. DAGStats A DAG Run status is determined when the execution of the DAG is finished. The execution of the DAG depends on its containing tasks and their dependencies. The status is assigned to the DAG Run when all of the tasks are in the one of the terminal states (i.e. if there is no possible transition to another state) like success, failed or skipped. The DAG Run is having the status assigned based on the so-called “leaf nodes” or simply “leaves”. Leaf nodes are the tasks with no children. There are two possible terminal states for the DAG Run: success if all of the leaf nodes states are either success or skipped, failed if any of the leaf nodes state is either failed or upstream_failed. Method Endpoint Overview GET /dagStats List Dag statistics. ImportError Airflow Best PractiveのTesting a DagにDAGのテスト観点に関する記述が(サラッと)書かれています。 まず、DAGは普通のpythonコードなので、pythonインタプリタで実行する際にエラーが起きないことを確認すべし、とのことです。 以下の実行により、未解決の依存関係、文法エラーをチェックします。もちろん、どこで実行するかが重要なので、DAG実行環境と合わせる必要があります。 Airflow APIにより、このレベルのエラーがDAGファイルにあるか確認できるようです。 $ python your-dag-file.py Method Endpoint Overview GET /importErrors List import errors. GET /importErrors/{import_error_id} Get an import error. Variables DAGに記述したくないCredentials等を管理する仕組みで、Airflow UIからポチポチ操作すると作れます。 Variableはkey-valueそのままです。DAGからkeyを指定することで参照できます。 Airflow APIからもVariableをCRUDできます。 Method Endpoint Overview GET /variables List variables.The collection does not contain data. To get data, you must get a single entity. POST /variables Create a variable. GET /variables/{variable_key} Get a variable by key. PATCH /variables/{variable_key} Update a variable by key. DELETE /variables/{variable_key} Delete a variable by key. まとめ RESTfulAPIが用意されているということは、内部のオブジェクトをCRUD出来るということなのだろう、 という推測のもと、Airflow APIのAPI仕様書を読んで感想を書いてみました。 Airflowの概念と対応するリソースはAPIに出現していて、End-To-End Pipeline Testを書く際に、Assert、実行制御を記述できそうな気持ちになりました。 Assert、実行制御、だけなら、こんなに要らない気もします。 API呼び出し自体の煩雑さがあり、Testの記述量が増えてしまうかもしれません。 以下の記事のようにwrapperを書く必要があるかもしれません。 https://github.com/chandulal/airflow-testing/blob/master/src/integrationtest/python/airflow_api.py DAGの入力側/出力側Endに対するファイル入出力は別で解決が必要そうです。 「API仕様書を読んでみた」の次の記事が書けるときになったら、再度まとめ記事を書いてみようと思います。

default eye-catch image.

CustomOperatorのUnitTestを理解するためGCSToBigQueryOperatorのUnitTestを読んでみた話

未知の連携先との入出力を行う際、CustomOperatorを作るという解決策があります。 CustomOperatorを自作した場合、そのテストをどう書くか、という問題が発生します。 ビルトインのGCSToBigQueryOperatorがどうテストされているかを読むと、雰囲気がわかりました。 UnitTestコードを読んで見ましたので、本記事で感想を書いてみます。 https://github.com/apache/airflow/blob/main/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py 前提となる知識 Airflowのhookについて理解する必要がありました。 フワッとしていますが、コードを読んで使われ方をながめているとイメージが湧いてきます。 hook しばしば外部からデータを入力したり外部へデータを出力する必要が出てくる。 外部と接続する際にcredentialsを保管し使用する必要があるが、 Airflowはconnectionという概念のオブジェクトを用意している。 connection は conn_id により識別される。Airflow UIやCLIから管理できる。 connectionを直接操作するようなlow-levelコードを書くこともできるが、 煩雑にならないよう、外部リソース毎にhookというhigh-levelインターフェースが用意されている。 Connections & Hooks pythonのunittestも理解する必要がありました。 unittestのmockについて以下が参考になりました。 [clink implicit=\"false\" url=\"https://qiita.com/satamame/items/1c56e7ff3fc7b2986003\" imgurl=\"https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcQqktBhX0kv-C4zk1lu0D8T0ExDUFQdNdu9dQ&s\" title=\"Python の unittest の mock\" excerpt=\"Python の unittest を使っていて、mock が何をするものかは分かっているけど、まだちょっと得体が知れない、怖い、という段階があると思います。この段階を克服するために、何も知らない状態から徐々に mock を理解するためのステップを作りたいと思いました。対象は Python 3.x です。\"] UnitTestを読んでいく TestGCSToBigQueryOperatorというクラスにUnitTestメソッドの実装例が書かれています。 python built-inのテストパッケージであるunittestが使用されています。 @mock.patchデコレータを使用しBigQueryHookをpatchしています。 BigQueryHookのmockインスタンスがhookとして渡ります。 hookのreturn_value, side_effectを差し替えてGCSToBigQueryOperatorインスタンスを実行します。 insert_job(),generate_job_id(),split_table_name(),get_job()の差し替えを行なっています。 メソッドの階層をドット(.)で繋いでより深い場所を差し替えられる様子です。 unittestを書いた人はコードが何に依存しているか分かるので、知識に基づいて依存しているものをmockします。 import json from unittest import mock from unittest.mock import MagicMock, call TASK_ID = \"test-gcs-to-bq-operator\" TEST_EXPLICIT_DEST = \"test-project.dataset.table\" WRITE_DISPOSITION = \"WRITE_TRUNCATE\" SCHEMA_FIELDS = [ {\"name\": \"id\", \"type\": \"STRING\", \"mode\": \"NULLABLE\"}, {\"name\": \"name\", \"type\": \"STRING\", \"mode\": \"NULLABLE\"}, ] MAX_ID_KEY = \"id\" JOB_PROJECT_ID = \"job-project-id\" TEST_BUCKET = \"test-bucket\" TEST_SOURCE_OBJECTS = \"test/objects/test.csv\" DATASET = \"dataset\" TABLE = \"table\" GCS_TO_BQ_PATH = \"airflow.providers.google.cloud.transfers.gcs_to_bigquery.{}\" job_id = \"123456\" hash_ = \"hash\" REAL_JOB_ID = f\"{job_id}_{hash_}\" class TestGCSToBigQueryOperator: @mock.patch(GCS_TO_BQ_PATH.format(\"BigQueryHook\")) def test_max_value_external_table_should_execute_successfully(self, hook): hook.return_value.insert_job.side_effect = [ MagicMock(job_id=REAL_JOB_ID, error_result=False), REAL_JOB_ID, ] hook.return_value.generate_job_id.return_value = REAL_JOB_ID hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) hook.return_value.get_job.return_value.result.return_value = (\"1\",) operator = GCSToBigQueryOperator( task_id=TASK_ID, bucket=TEST_BUCKET, source_objects=TEST_SOURCE_OBJECTS, destination_project_dataset_table=TEST_EXPLICIT_DEST, write_disposition=WRITE_DISPOSITION, schema_fields=SCHEMA_FIELDS, max_id_key=MAX_ID_KEY, external_table=True, project_id=JOB_PROJECT_ID, ) \"基づく知識\"は第三者には理解不能ですが、GCSToBigQueryOperator.pyを読むと理由がわかります。 GCSToBigQueryOperatorのexecute(self, context:Context)を読むと、 先頭でBigQueryHookのインスタンスを取得し、BaseOperator由来のself.hookに設定しているようです。 generate_job_id()により、job_idを取得しています。 _use_existing_table()内で、split_table_name()により,ProjectID,Dataset,Tableを取得しています。 mockしたjob_idが既に存在している場合、get_job()で既存を取得しています。 def execute(self, context: Context): hook = BigQueryHook( gcp_conn_id=self.gcp_conn_id, location=self.location, impersonation_chain=self.impersonation_chain, ) self.hook = hook self.source_format = self.source_format.upper() job_id = self.hook.generate_job_id( job_id=self.job_id, dag_id=self.dag_id, task_id=self.task_id, logical_date=context[\"logical_date\"], configuration=self.configuration, force_rerun=self.force_rerun, ) さて、Assertは以下のように書かれています。 GCSToBigQueryOperatorは、Source(GCS)から.csv等を読み込みDest(BigQuery)へ配置するものです。 Destの然るべき場所にテーブルが作られ、値が入ります。 execute()すると、max_id_keyで指定したカラムの最大値が戻るようです。 \"test-bucket\"に配置した\"test/objects/test.csv\"は\"id\",\"name\"の2列からなるCSVで、 例えば\"id\"=\"1\", \"name\"=\"hoge\"ならば、\"id\"列の最大値である1が戻るため、1をassertすればOKです。 result = operator.execute(context=MagicMock()) assert result == \"1\" これだと、分岐をだいぶすっ飛ばしているので、だいぶ薄いカバレッジになるかと思います。 まとめ GCSToBigQueryOperatorのUnitTestを読んでみました。分かってしまうと普通のUnitTestでした。 Source to Destのパターンはだいたい似たようになるのかも、なので、 作るUnitTestも似たような感じになるのかもしれません。

default eye-catch image.

GoogleによるAirflow DAG実装のベスプラ集を読んでみた – その1

GoogleによるフルマネージドAirflow環境であるCloud Composerを使う必要があり、 いそぎでAirflow+Cloud Composerをキャッチアップすることになりました。 Googleが公開するベスプラ集があることを知り、読んでみることにしました。 Cloud Composerと題されていいますが、ほぼAirflowと読み替えて良いのかなと思います。 書かれているのは、少し基本的なシナリオだと思います。 経験に裏付けられたゴリゴリの集合知、というものはタダでは手に入らないのだろうと思います。 スタート地点に立つ際の道しるべ、ぐらいの気持ちです。 おそらく一緒に使うシナリオが多いであろう、データ変換ツールのdbtと競合するものがあります。 大構造としてはAirflow DAGの下にdbt DAGが来るため、Airflow DAGのベスプラを実現する前提で dbt DAGを書いていくものと考えていました。これだけだとバッティングすると思います。 ウェアハウスとは切り離されています。特にBigQueryを前提にするならもう少し踏み込んだ内容と なるはずだと思いますが、ちょっと書かれていないようです。 いったん半分くらい読んでみたので読書感想文を書いてみました。 [clink implicit=\"false\" url=\"https://cloud.google.com/blog/ja/products/data-analytics/optimize-cloud-composer-via-better-airflow-dags\" imgurl=\"https://www.gstatic.com/pantheon/images/welcome/supercloud.svg\" title=\"Airflow DAG の改良による Cloud Composer の最適化\" excerpt=\"このガイドには、Apache Airflow DAG を作成する際に一般的に適用できる、作業項目のチェックリストが掲載されています。これらのチェック項目は、Google Cloud とオープンソース コミュニティが判断したベスト プラクティスに沿ったものとなっています。一連の高パフォーマンスの DAG によって Cloud Composer の動作が最適化され、標準化された作成方法によって、デベロッパーが数百個、あるいは数千個の DAG でも管理できるようになります。チェック項目のそれぞれが、Cloud Composer 環境と開発プロセスにメリットをもたらします。\"] [arst_toc tag=\"h4\"] はじめに ファイル名を標準化します Workflowの特徴を個別に表現するだけでは不十分で、ファイル名が含む部分文字列がサブ機能や属性のインデックスとなっていて欲しい。さらに、ファイル名から機能概要を逆引き推測できたら便利。 作成した DAG ファイルのコレクションを他のデベロッパーが容易に参照できるようにするためです。例: team_project_workflow_version.py DAG は決定的でなければなりません 入力となるデータが同じであれば、出力は常に同じであるべき、という観点だと思う。 例えば、入力となるデータは同じであっても、実行時間に依存して処理を行ってしまうと、 出力が時間依存となる。テストが無茶苦茶大変になるだろうなと思う。 Airflow DAG単体であれば、そう理解に難しくないポイントだとは思う。 しかし、dbt DAGを含めると一気に縛りがキツくなると思う。 特定の入力によって常に同じ出力が生成される必要があります DAG はべき等でなければなりません 大雑把に書けば、ある操作を1回行っても数回行っても結果が同じであることを言う。 これを実現する仕組みの選択は結構悩ましく、「追加する範囲をいったん削除追加する」が簡単。 しかし、この方法だと無駄なスキャン量が発生する。 dbtを使用する場合、incremental modelがべき等性担保の手段として挙げられることが多いが、 実際にべき等性を担保するには考慮しないといけないことがある。 こちらの記事(dbtで「Incremental」を使わずに冪等性を担保する方法について)が詳しい。 DAG を何度トリガーしても、毎回同じ効果 / 結果が得られなければなりません 例えば、以下のように書けば、入力テーブルが変わらない限りべき等となる。 これを行うには、入力テーブルに「ロード日時」といったメタデータが必要となる。 {{ config( materialized=\"incremental\" ) }} ・・・ {%- if is_incremental() %} WHERE ORDERDATE = TO_DATE(\'{{ var(\'load_date\') }}\') {%- endif %} タスクはアトミック、かつ、べき等でなければなりません ちょっと何言っているかわからない書きっぷり。データベースのACID特性のAtomic性を意識する。 ある操作が一連の処理の完了によって達成される場合、部分的に処理の一部が成功する、という 状態になってしまってはいけない。全ての処理が成功したか、全ての処理が失敗したか、のどちらか、 になっていないといけない。 タスクごとに、他のオペレーションとは独立して再実行できる 1つのオペレーションを処理するようにします。タスクがアトミックな場合、そのタスクの一部が成功したことは、タスク全体が成功したことを意味します。 可能な限り単純な DAG にします ちょっと一般的すぎて良くわからない。 「スケジューリングのコスト」って、「実行のコスト」よりもだいぶ小さいんじゃなかろうか、と思うが、 それでも意識しないといけないのだろうか。 ネストされたツリー構造は、そもそも理解しづらくて避けるべきだろう、とは思う。 タスク間の依存関係が少ない単純な DAG にすると、オーバーヘッドが少なくなるため、スケジューリングのパフォーマンスが向上する傾向があります。一般的に、多数の依存関係がある深くネストされたツリー構造よりも線形構造(例: A->B->C)にしたほうが、効率的な DAG になります。 Python の docstring 規則に従って、各関数のファイルの上部にドキュメントを記述してください AirflowはPythonで書けることが最大の特徴なので、そのメリットを発揮するため、docstringでコメント書けよと。 Python の docstring 規則は、他のデベロッパーやプラットフォーム エンジニアが Airflow DAG を理解するために役立ちます。 関数と同様に BashOperator のドキュメントも作成するようにしてください。DAG で bash スクリプトを参照している場合、スクリプトの目的を記したドキュメントがないと、このスクリプトに詳しくないデベロッパーにはトラブルシューティングが困難です。 DAG の作成を標準化する default_args にオーナーを追加します。 タスクを得る(Operatorをインスタンス化する)際に、各Operatorのコンストラクタに引数を与えるが、 複数のOperatorに渡す引数を共通化したい場合には、default_argsをDAG()に与える。 こうすると、Operatorにdefault_argsで設定した引数を与えたことになる。 各Operatorに引数を与えると、defautl_argsをオーバーライドする動作となる。 過去の公式でdefault_argsは、task_id と owner が mandatory(必須) であるとされている。 これについて、Why is \'owner\' a mandatory argument for tasks and dags? という記事がある。 それに続くPRは More detail on mandatory task arguments であり、mandatoryの根拠を聞いている。 歴史的な理由による、で片付いているな。ベスプラではownerに実装者のメアドなどを書けという。 実装担当者を明らかにせよ、という話であればcomitterを見れば良いだけで、ちょっと意味不明。 mandatoryなので、何か入れないといけないなら、とりあえず実装者のメアドを入れておけ、ということか。 import pendulum with DAG( dag_id=\'my_dag\', start_date=pendulum.datetime(2016, 1, 1, tz=\"UTC\"), schedule_interval=\'@daily\', catchup=False, default_args={\'owner\': \'hoge@ikuty.com\'}, ) as dag: op = BashOperator(task_id=\'dummy\', bash_command=\'Hello World!\') print(op.retries) # 2 dag = DAG() ではなく with DAG() as dag を使用します。 Pythonのwith文の仕様。コンテキストマネージャという言う。 try... except... finally をラップするため、リソースの確保と対応する解放が必ず行われる。 with DAG(...):文の下のインデントの中では、各Operatorのコンストラクタにdagインスタンスを 渡さなくてよくなる。 すべてのオペレーターまたはタスクグループに DAG オブジェクトを渡す必要がなくなるようにします。 DAG ID 内にバージョンを設定します。 以下とのこと。あぁ..バージョニングが実装されていないので手動でバージョニングを行うべし、と。 AirflowはDAGをファイルIDで管理しているため、ファイルIDを変更するとUI上、別のDAGとして扱われるよう。 積極的にDAG IDを変更して、Airflow UIに無駄な情報を出さないようにする、というアイデア。 DAG 内のコードを変更するたびにバージョンを更新します。 こうすると、削除されたタスクログが UI に表示されなくなることや、ステータスのないタスクが古い DAG 実行に対して生成されること、DAG 変更時の一般的な混乱を防ぐことができます。 Airflow オープンソースには、将来的にバージョニングが実装される予定です。 DAG にタグを追加します。 公式はこちら。Add tags to DAGs and use it for filtering in the UI 単にUI上の整理のために留まらず、処理の記述に積極的に使うことができる様子。 これを無秩序に使うと扱いづらくなりそうなので、使う場合は用途を明確にしてから使うべきかと思う。 1. デベロッパーがタグをフィルタして Airflow UI をナビゲートできるようにします。 2. 組織、チーム、プロジェクト、アプリケーションなどを基準に DAG をグループ化します。 DAG の説明を追加します。 唐突で非常に当たり前なのだが、あえて宣言することが大事なんだろうと思う。 他のデベロッパーが自分の DAG の内容を理解できるようにします。 作成時には DAG を一時停止します。 なるほど。 こうすると、誤って DAG が実行されて Cloud Composer 環境の負荷が増すという事態を回避できます。 catchup=Falseに設定して自動キャッチアップによるCloud Composer環境の過負荷を避けます。 まず、catchupの前に、Airflowの実行タイミングが直感的でなさそう。 こちらの記事がとても参考になった。【Airflow】DAG実行タイミングを改めて纏めてみた DAGの実行タイミングはstart_dateとschedule_intervalを利用して計算される。 重要なポイントはschedule_intervalの終了時にDAGが実行される、という点。 また、schedule_intervalはウインドウ枠を表している。 例えば 0 11 * * * であれば、毎日11:00-翌日11:00という時間の幅を表す。 start_date=7月15日、schedule_interval=0 11 * * * のとき、 7月15日 11:00から 7月16日11:00までの期間が終わった後、DAGが開始される。 DAGをデプロイする際、デプロイ日時よりも古いstart_dateを設定することができる。 このとき、start_dateからデプロイ日時までの間で、本来完了しているはずだが実行していない schedule_intervalについてDAGを実行する機能がcatchup。 catchup=Trueとすると、これらのschedule_intervalが全て再実行の対象となる。 一方、catchup=Falseとsるうと、これらのうち、最後だけが再実行の対象となる。 (Falseとしても、最後の1回は再実行される) 過去のデータを自動投入するとか、危ないので、確認しながら手動実行すべきだと思う。 もし本当にcatchupするのであれば、計画的にFalseからTrueにすべきだろうし、 その時は負荷を許容できる状況としないといけない。 DAGが完了せずにCloud Composer環境のリソースが保持されることや、再試行時に競合が引き起こされることのないよう、 dagrun_timeout を設定します DAG、タスク、それぞれにタイムアウトプロパティが存在する。それぞれ理解する必要がある。 DAGタイムアウトはdagrun_timeout、タスクタイムアウトはexecution_timeout。 以下が検証コード。job1のexecution_timeout引数をコメントアウトしている。 コメントアウトした状態では、dagrun_timeoutがDAGのタイムアウト時間となる。 検証コードにおいては、タイムアウト時間が15秒のところ、タスクで20秒かかるのでタイムアウトが起きる。 execution_timeout引数のコメントアウトを外すと、DAGのタイムアウト時間が タスクのタイムアウト時間で上書きされ30秒となる。 タスクで20秒かかってもタイムアウトとならない。 from datetime import timedelta from airflow.utils.dates import days_ago from airflow import DAG from airflow.operators.python import PythonOperator def wait(**context): time.sleep(20) defalut_args = { \"start_date\": days_ago(2), \"provide_context\": True } with DAG( default_args=defalut_args, dagrun_timeout=timedelta(seconds=15), ) as dag: job1 = PythonOperator( task_id=\'wait_task1\', python_callable=wait, # execution_timeout=timedelta(second=30) ) ベスプラの言うところは、ちゃんとタイムアウトを設定しろよ、ということだと思う。 インスタンス化で DAG に引数を渡し、すべてのタスクにデフォルトで同じ start_date が設定されるようにします Airflowでは、Operatorのコンストラクタにstart_dateを与えられるようになっている。 同一DAGに所属するタスクが異なるstart_dateを持つ、という管理が大変なDAGを作ることも出来てしまう。 基本的には、DAGにstart_dateを渡して、タスクのデフォルトを揃えるべき、だそう。 DAG では静的な start_date を使用します。 これがベスプラになっているのはかなり助かる。 動的な start_date を使用した場合、誤った開始日が導き出され、失敗したタスク インスタンスやスキップされた DAG 実行を消去するときにエラーが発生する可能性があります。 retries を、DAG レベルで適用される default_args として設定します。 retriesについても、start_dateと同様にDAGレベルで default_args として設定するそう。 なお、タスクのリトライに関する設定には以下のようなものがある。 retries (int) retry_delay (datetime.timedelta) retry_exponential_backoff (bool) max_retry_delay (datetime.timedelta) on_retry_callback (callable) retries (int)は、タスクが\"失敗\"となる前に実行されるリトライ回数。 retry_delay (datetime.timedelta)はリトライ時の遅延時間。 retry_exponential_backoff (bool)はリトライ遅延での指数関数的後退アルゴリズムによるリトライ間隔の待ち時間を増加させるかどうか max_retry_delay (datetime.timedelta)はリトライ間の最大遅延間隔 on_retry_callback (callable)はリトライ時のコールバック関数 適切な再試行回数は 1~4 回です。再試行回数が多すぎると、Cloud Composer 環境に不要な負荷がかかります。 具体的に retries を何に設定すべきか、について書かれている。 ここまでのまとめ ここまでのステートメントがコードになっている。 わかりやすい。 import airflow from airflow import DAG from airflow.operators.bash_operator import BashOperator # default_args 辞書を定義して、DAG のデフォルト パラメータ(開始日や頻度など)を指定する default_args = { \'owner\': \'me\', \'retries\': 2, # 最大再試行回数は 2~4 回にすること \'retry_delay\': timedelta(minutes=5) } # `with` ステートメントを使用して DAG オブジェクトを定義し、一意の DAG ID と default_args 辞書を指定する with DAG( \'dag_id_v1_0_0\', # ID にバージョンを含める default_args=default_args, description=\'This is a detailed description of the DAG\', # 詳しい説明 start_date=datetime(2022, 1, 1), # 静的な開始日 dagrun_timeout=timedelta(minutes=10), # この DAG に固有のタイムアウト is_paused_upon_creation= True, catchup= False, tags=[\'example\', \'versioned_dag_id\'], # この DAG に固有のタグ schedule_interval=None, ) as dag: # BashOperator を使用してタスクを定義する task = BashOperator( task_id=\'bash_task\', bash_command=\'echo \"Hello World\"\' )

default eye-catch image.

Snowpark Container Services上でWebアプリ(FastAPI/React/TypeScript)を動かしてみた

シンプルな Multi-Container App を動かしている以下の記事にインスパイアされてみました。 以下の記事では、Docker networkを前提にフロントがサーバの名前解決を行っています。 これをデプロイすると、ブラウザで動くフロントコードがサーバの名前を解決できません(SPCS無関係)。 リバースプロキシを挟んでプライベートなダウンストリームにAPIを配置する方法が良さそうです。 今回の記事はSPCSの動作確認をすることが目的なので凝ったことはせず、 ViteをそのままデプロイしてProxyで解決してみたのでご紹介します。 [clink implicit=\"false\" url=\"https://medium.com/@maseedilyas9848/snowflake-container-mastery-step-by-step-deployment-of-your-multi-container-app-with-snowpark-211682514851\" imgurl=\"https://miro.medium.com/v2/resize:fit:1400/format:webp/1*t7s-Rl6F4BBV-yYs-ovODQ.png\" title=\"Snowflake Container Mastery: Step-by-Step Deployment of Your Multi-Container App with Snowpark Container Services\" excerpt=\"The buzz around town is all about Snowflake’s latest product feature, “Snowpark Container Services” and the excitement is real. Now, with the feature hitting public preview in various AWS regions, this blog dives into the nitty-gritty of what container services bring to the table. Join me as we explore what makes this feature tick and unravel the steps to deploy a multi-container app within Snowflake. Let’s break it down!\"] [arst_toc tag=\"h4\"] SPCSのアーキテクチャ Image Registryはイメージリポジトリです。AWSのECR、AzureのACRと似た感じで操作できます。 Container Serviceはデプロイメントの単位で、1個以上のコンテナをデプロイできます。 Compute Poolは計算資源で、複数のContainer Serviceが共有します。 Serviceは基本的にはPrivateなリソースとなりますが、SpecでEndpointsを定義することで、 Publicリソースとすることができます。自動的に80にmapされます。 Serviceには以下のフォーマットでDNS名が付きます。 Service同士は以下のDNS使ってプライベート通信できます。 同一DB、Schema内に作成したServiceは先頭のServiceNameが異なるだけとなりますが、 その場合に限り、ServiceNameだけで互いの名前解決ができます。 Service間連携については、公式で\"Service-to-Service\"というパターンで紹介されています。 <Service Name>-<Schema Name>-<DB Name>.snowflakecomputing.internal 1つのService内に複数のコンテナを配置することができます。 同一Service内で各コンテナ内の通信したいと考えたのですが、方法を見つけることができませんでした。 (出来ないはずはなさそうで方法はあるのかもしれません..) 今回作るもの フロント側、サーバ側の2つのコンテナを、それぞれ別々のServiceとしてデプロイします。 フロント側をPublic、サーバ側はPrivateとします。概要は以下の通りです。 フロント側 Vite React/TypeScript ReactからのAPIリクエストは一旦ViteのProxyで受けて、ProxyからAPIに流す サーバ側にGETリクエストして応答を表示するだけ サーバ側 uvicorn FastAPI poetry GETリクエストを受けて\"Hello World\"をJSONで返すだけ 思いっきり開発用な感じですが、SPCSの動作確認が目的ですのでこれでいきます。 ローカルで動作確認をして、SPCSにデプロイします。ファイル構成は以下の通りです。 $ tree . -I node_modules -I __pycache__ . ├── api │   ├── Dockerfile │   ├── api.py │   ├── app.py │   ├── main.py │   └── pyproject.toml ├── compose.yml └── front ├── Dockerfile ├── entry.sh ├── index.html ├── package-lock.json ├── package.json ├── src │   └── hello.tsx ├── tsconfig.json ├── tsconfig.node.json └── vite.config.ts compose.yml サーバ側、フロント側の2つのコンテナが、サーバ側->フロント側の順に起動するように書きます。 それぞれ、8080、5173 で待つようにします。 services: api: build: api volumes: - ./api:/app ports: - 8080:8080 front: build: front volumes: - ./front:/app ports: - 5173:5173 depends_on: - api サーバ側 Snowflakeの公式のチュートリアルだとFlaskが使われています。 今回は、最近使い始めたFastAPIを使ってAPIサーバを立ててみようと思います。 FlaskとFastAPIの比較はこちらが詳しいです。 FastAPIの特徴はPydanticによるデータ検証とAsyncI/O。TypeScriptのように型チェックできます。 パッケージマネージャにはpoetryを使います。デファクトが無いPythonのパッケージ管理界隈で npmやcomposer的な使い勝手が提供されます。 FastAPIの公式では、Python用Webサーバのuvicornを使ってホストされています。 uvicornでFastAPIを動かすコンテナを1個立てていきます。 Dockerfile 最新のPythonのイメージにpoetryをインストールします。 公式がインストーラを配布していて公式の手順通りに叩けばインストールできます。 公式のガイドの通り、POETRY_HOME=/etc/poetry とします。 Installation with the official installer FROM python:3.12 # 公式の通り /etc/poetry にインストールする ENV POETRY_HOME=/etc/poetry RUN curl -sSL https://install.python-poetry.org | python - ENV PATH $POETRY_HOME/bin:$PATH WORKDIR /app COPY . . RUN poetry install CMD [\"python\",\"main.py\"] pyproject.toml バニラのPythonとpyproject.tomlだけで依存関係を考慮したパッケージ管理が出来ますが、 要はnpmやbundle,composer的な使い勝手に寄せたパッケージ管理に対する需要があります。 poetry用の依存関係を書いていきます。fastapi、uvicornの現在(2/16)の最新を指定します。 [tool.poetry] name = \"test\" [tool.poetry.dependencies] python = \"^3.12\" fastapi = \"^0.109.2\" uvicorn = \"^0.27.1\" api.py [GET] /hello に対して Hello World 的な JSON を返す FastAPI の Hello Worldです。 後のapp.pyで FastAPIインスタンスに紐付けます。app.pyと分離することでロジックを分離できます。 from fastapi import APIRouter router = APIRouter() @router.get(\"/hello\") async def hoge(): return {\"result\":\"Hello World\"} app.py FastAPIのHello Worldコードです。Dockerfileから開始します。 from api import router as api_router from fastapi import FastAPI app = FastAPI() app.include_router(api_router, prefix=\"/api\") main.py pythonコードからuvicornを起動します。uvicorn.runの公式の仕様はこちら。 第1引数の\"app:app\"は\"app\"モジュールの中の\"app\"オブジェクトという表記です。 app.pyに記述したFastAPI()のインスタンスappを指します。stringで渡す必要があります。 hostは\"0.0.0.0\"を指定します。なぜ\"127.0.0.1\"でないのかはこちらが参考になります。 今回はPort=8080で起動します。reload=Trueとすると、HotReload機能が有効になります。便利。 import uvicorn if __name__ == \"__main__\": uvicorn.run( \"app:app\", host=\"0.0.0.0\", port=8080, reload=True, ) 起動してみる docker compose up して http://localhost:8080/docs を開くと以下が表示されます。 ちゃんとJSONでHello Worldが戻りました。 フロント側 VueとReactの開発用に使われるローカル開発用のサーバ Vite をホストするコンテナを立てます。 Viteは Vue.jsの開発者Evan You氏が開発したJavaScript/TypeScriptで、ヴィートと読み、 フランス語で\"素早い\"という意味だそう。(webpackのように)リソースバンドルが不要で起動が速い。 (Laravelも9.xでwebpackを捨ててViteになってた..) 素の状態でTypeScriptを扱えるため、すぐにTypeScriptを書き始められる特徴があります。 Dockerfile nodeのrelease scheduleはこちら。 2/17のnodeのActiveLTSのMajor Versionは20で、2026-04-30がEnd of lifeとなっています。 これを使いたいので node:20 を指定します。 FROM node:20 WORKDIR /app COPY . . RUN npm install ENTRYPOINT [ \"./entry.sh\" ] entry.sh Dockerfile の ENTRYPOINT で npm run dev するだけのshです。 #!/bin/bash npm run dev package.json npm create vite@latest で Prjディレクトリ内に様々なファイルが作られます。 package.jsonも作られます。 Hello World で必要なもの以外を削ってみました。 npm install後、npm run devで viteを実行します。 TS用のconfigは別です。 他に生成されるpackage-lock.jsonが必要ですが省略します。 { \"name\": \"front\", \"private\": true, \"version\": \"0.0.0\", \"scripts\": { \"dev\": \"vite\" }, \"dependencies\": { \"react\": \"^18.2.0\", \"react-dom\": \"^18.2.0\" }, \"devDependencies\": { \"@vitejs/plugin-react\": \"^4.2.1\" } } tsconfig.json viteのPrj生成で自動生成されるTS用のconfigファイルです。 手をつけずに配置します。 { \"compilerOptions\": { \"target\": \"ES2021\", \"useDefineForClassFields\": true, \"lib\": [\"ES2021\", \"DOM\", \"DOM.Iterable\"], \"module\": \"ESNext\", \"skipLibCheck\": true, /* Bundler mode */ \"moduleResolution\": \"bundler\", \"allowImportingTsExtensions\": true, \"resolveJsonModule\": true, \"isolatedModules\": true, \"noEmit\": true, \"jsx\": \"react-jsx\", /* Linting */ \"strict\": true, \"noUnusedLocals\": true, \"noUnusedParameters\": true, \"noFallthroughCasesInSwitch\": true }, \"include\": [\"src\"], \"references\": [{ \"path\": \"./tsconfig.node.json\" }] } tsconfig.node.json viteのPrj生成で自動生成されるTS用のconfigファイルです。 手をつけずに配置します。 { \"compilerOptions\": { \"composite\": true, \"skipLibCheck\": true, \"module\": \"ESNext\", \"moduleResolution\": \"bundler\", \"allowSyntheticDefaultImports\": true }, \"include\": [\"vite.config.ts\"] } vite.config.ts viteのPrj生成で自動生成されるvite用のconfigファイルです。 設定ファイルが.tsなところが凄いです。普通にimport文を書けます。 上のuvicornの起動で127.0.0.1ではなく0.0.0.0を指定したのと同様に、 viteも127.0.0.1ではなく0.0.0.0で待たせる必要があります。 serverオプションのhostにtrueを設定すると、0.0.0.0となります。公式 FastAPIの同一パスと対応するProxyを設定します。 以下で、server.proxy.api.target は SPCS上のAPIコンテナのPrivateエンドポイント を表します。 DNS名はサービス単位で作られます。本来長いFQDNを指定する必要がありますが、 同一スキーマに作られたサービスに限り、サービス名だけで解決できるようです。 DNS名はアンダースコア(_)がハイフン(-)に置き換わります。6時間くらいハマりました.. 後で ikuty_api_service サービスを作りますが、ikuty-api-serviceを 使います。 詳細は以下を参照してください。 Service-to-service communications import { defineConfig } from \'vite\' import react from \'@vitejs/plugin-react\' export default defineConfig({ plugins: [react()], server: { host: true, proxy: { \"/api\": { target: `http://ikuty-api-service:8080/`, changeOrigin: true } } }, }) index.html Reactのコンポーネントを表示するガワとなるhtmlです。 <!DOCTYPE html> <html> <head> <meta charset=\"UTF-8\" /> <meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\" /> <script type=\"module\" src=\"/src/hello.tsx\" defer></script> </head> <body> <div id=\"root\">Waiting...</div> </body> </html> src/hello.tsx ようやくHello WorldするReactコンポーネントの本体です。 画面にはvalueというStateを表示しています。APIのURLは上記の通りproxyとします。 今回作成した/api/hello APIの応答を受けた後、setValueによりStateを更新します。 import React from \'react\' import { useState } from \'react\' import ReactDOM from \'react-dom/client\' const App = () => { const [value, setValue] = useState(\'\') const url = \'/api/hello\' fetch(url,{}) .then(res=>res.json()) .then(data=>setValue(data[\'result\'])) return ( {value},{} ) } ReactDOM.createRoot(document.getElementById(\'root\')!).render( ) 起動してみる docker compose up すると、ほとんど一瞬でviteが起動します。 http://localhost:5173 を開きます。 Waiting...という表示が一瞬で Hello World に書き変わります。 ロールの作成 SPCSの各リソースの作成に必要な権限はこちらにあります。 ゴリ押ししただけなので間違っている可能性大です.. 行ったり来たりしたので足りないものがあるかもしれません。 use role ACCOUNTADMIN; CREATE ROLE IKUTY_CONTAINER_USER_ROLE; GRANT ROLE IKUTY_CONTAINER_USER_ROLE TO ROLE ACCOUNTADMIN; GRANT USAGE ON DATABASE IKUTY_DB TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT USAGE ON SCHEMA IKUTY_DB.PUBLIC TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT CREATE IMAGE REPOSITORY ON SCHEMA T_IKUTA_DB.PUBLIC TO ROLE IKUTY_CONTAINER_USER_ROLE; -- CREATE SERVICEに必要な権限 -- https://docs.snowflake.com/en/sql-reference/sql/create-service#access-control-requirements GRANT USAGE ON DATABASE IKUTY_DB TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT USAGE ON SCHEMA IKUTY_DB.PUBLIC TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT CREATE COMPUTE POOL ON ACCOUNT TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT CREATE IMAGE REPOSITORY ON SCHEMA IKUTY_DB.PUBLIC TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT CREATE SERVICE ON SCHEMA IKUTY_DB.PUBLIC TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT USAGE ON COMPUTE POOL IKUTY_SCS_POOL TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT IMPORTED PRIVILEGES ON DATABASE snowflake TO ROLE IKUTY_CONTAINER_USER_ROLE; -- GRANT READ ON STAGE IKUTY_SCS_STAGE TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT READ ON IMAGE REPOSITORY IKUTY_SCS_REPOSITORY TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE IKUTY_CONTAINER_USER_ROLE; Image Repositoryの作成 SPCSで使用するイメージを配置するリポジトリを作成します。 AWSのECR、AzureのACR的に、dockerコマンドから透過的にpushできるようです。 公式は以下。 CREATE IMAGE REPOSITORY USE ROLE IKUTY_CONTAINER_USER_ROLE; CREATE OR REPLACE IMAGE REPOSITORY IKUTY_SCS_REPOSITORY; SHOW IMAGE REPOSITORIES; SHOW IMAGEを叩くと repository_url が返ってます。 Image Repositoryにプッシュする 作成したImage Repositoryにローカルで作成したイメージをpushしていきます。 pushは指定されたタグを送信するという仕様のため、docker tagコマンドでイメージにタグを付けます。 docker tagの仕様はこちら。 ローカルで以下を行います。(サニタイズのため分かりづらいですが補完してください..) # タグをつける # docker tag $ docker tag app_front:latest /app_front:scs $ docker tag app_api:latest /app_api:scs # Snowflake Image Repositoryにログインする $ docker login -u Login Succeeded # イメージをpushする $ docker push /app_front:scs ... $ docker push /app_api:scs ... Compute Poolを作成する Compute Poolを作成します。 CREATE COMPUTE POOL CREATE COMPUTE POOL ikuty_scs_pool MIN_NODES = 1 MAX_NODES = 1 INSTANCE_FAMILY = CPU_X64_XS AUTO_RESUME = TRUE INITIALLY_SUSPENDED = FALSE AUTO_SUSPEND_SECS = 3600 ; 以下でCREATEしたCompute poolをDESCRIBEできます。 DESCRIBE COMPUTE POOL 自分の環境だと、CREATE COMPUTE POOLしてから15分ほどステータスがSTARTINGでした。 15分ぐらいして叩くとステータスがACTIVEに変わりました。(結構かかるイメージ) 以下、公式の実行例です。 DESCRIBE ikuty_scs_pool +-----------------------+--------+-----------+-----------+-----------------+--------------+----------+-------------------+-------------+--------------+------------+-------------------------------+-------------------------------+-------------------------------+--------------+---------+ | name | state | min_nodes | max_nodes | instance_family | num_services | num_jobs | auto_suspend_secs | auto_resume | active_nodes | idle_nodes | created_on | resumed_on | updated_on | owner | comment | |-----------------------+--------+-----------+-----------+-----------------+--------------+----------+-------------------+-------------+--------------+------------+-------------------------------+-------------------------------+-------------------------------+--------------+---------| | IKUTY_SCS_POOL | ACTIVE | 1 | 1 | CPU_X64_XS | 1 | 0 | 0 | false | 1 | 0 | 2023-05-01 11:42:20.323 -0700 | 2023-05-01 11:42:20.326 -0700 | 2023-08-27 17:35:52.761 -0700 | ACCOUNTADMIN | NULL | +-----------------------+--------+-----------+-----------+-----------------+--------------+----------+-------------------+-------------+--------------+------------+-------------------------------+-------------------------------+-------------------------------+--------------+---------+ Serviceの作成 フロント側、サーバ側の2つのServiceを作成していきます。 specについて、ステージにファイルを配置してそれを指定するスタイルのほかに、 以下のようにCREATE SERVICEに含めるスタイルがあるようです。 CREATE SERVICE フロント側のServiceは以下です。 CREATE SERVICE ikuty_api_service IN COMPUTE POOL ikuty_scs_pool FROM SPECIFICATION $$ spec: containers: - name: api-container image: endpoints: - name: api port: 8080 $$ ; サーバ側のServiceは以下です。 CREATE SERVICE ikuty_front_service IN COMPUTE POOL ikuty_scs_pool FROM SPECIFICATION $$ spec: containers: - name: front-container image: endpoints: - name: front port: 5173 public: true $$ ; SERVICE用のシステム関数 SaaSで動くコンテナの動作を確認するのは結構面倒なことなのかなと思います。 自分の操作に対してSaaS側で何が行われているのか知りたいことは結構あるのかなと思います。 SPCSには以下のコマンドがあるようです。 SYSTEM$GET_SERVICE_STATUS SYSTEM$GET_SERVICE_LOGS エンドポイントURLの取得 SHOW ENDPOINTS すると、Specで指定したpublicなendpointを得られました。 ingress_url に なんとかかんとか.snowflakecomputing.app というURLが入っています。 SHOW ENDPOINTS 動作確認 Computing poolのStatusがACTIVEになってから、エンドポイントURLをブラウザで開くと、 期待通り、Reactで作ったHello Worldアプリが表示されます。 SYSEM$GET_SERVICE_LOGS()でフロントサービスのログを覗くと、viteの起動ログが出ていました。 そうえいば 5173 を 80 に mapping する記述をどこにもしていないのですが、そうなっています。 > front@0.0.0 dev > vite VITE v4.5.2 ready in 314 ms ➜ Local: http://localhost:5173/ ➜ Network: http://10.244.1.3:5173/ ➜ Network: http://172.16.0.6:5173/ 同様に、サーバ側のログを覗くと、uvicornの起動ログが出ていました。 ViteのProxyから8080で繋がるので、こちらは8080が開いています。 INFO: Will watch for changes in these directories: [\'/app\'] INFO: Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit) INFO: Started reloader process [1] using StatReload INFO: Started server process [8] INFO: Waiting for application startup. INFO: Application startup complete. まとめ SnowflakeをWebサーバのインフラにするだけの内容で正直意味がないです。 しかし、APIでSnowflakeに触ったり、Reactで格好良い可視化をしたり、夢は広がります。 FastAPI,React,TypeScriptの恩恵ゼロなので、今後ちょっと凝ったものを作ってみます。