独立な確率変数の共分散がゼロであること
[mathjax] 共分散と相関係数の定義について過去に書いていた。 そもそも共分散が発生するのは、2つの確率変数が連動して動くから。 2つの確率変数が独立している場合は、共分散、相関係数共にゼロ。 共分散の定義 まず、共分散、相関係数の定義はこの通り。 2次元のデータ((x_1,y_1),(x_2,y_2),cdots,(x_n,y_n))が与えられた場合、 変数(x)と(y)の相関係数(r_{xy})は、それぞれの標準偏差(S_x,S_y)と、共分散(C_{xy})を使って以下となる。 begin{eqnarray} r_{xy} &=& frac{C_{xy}}{S_x S_y} \\ &=& frac{sum_{i=1}^n(x_i-bar{x})(y_i-bar{y})/n}{sqrt{sum_{i=1}^n{(x_i-bar{x})^2}/n} sqrt{sum_{i=1}^n{(y_i-bar{y})^2}/n}} \\ &=& frac{sum_{i=1}^n(x_i-bar{x})(y_i-bar{y})}{sqrt{sum_{i=1}^n{(x_i-bar{x})^2}} sqrt{sum_{i=1}^n{(y_i-bar{y})^2}}} \\ end{eqnarray} [clink url=\"https://ikuty.com/2018/08/13/correlation_coefficient/\"] [arst_adsense slotnumber=\"1\"] そもそもの共分散 確率変数(X),(Y)があったとする。それぞれの期待値は(E(X)),(E(Y))、分散は(V(X),V(Y))。 定義通りに(V(X+Y))を式展開していくと以下の通りになる。 begin{eqnarray} V(X+Y) &=& E(((X+Y)-mu_{X+Y})^2) \\ &=& E((X+Y-mu_x-mu_y)^2) \\ &=& E(((X-mu_x) + (Y-mu_y))^2) \\ &=& E((X-mu_x)^2) + E((Y-mu_y)^2) + 2E((X-mu_x)(Y-mu_y)) \\ &=& V(X) + V(Y) + 2E((X-mu_x)(Y-mu_y)) \\ &=& V(X) + V(Y) + 2C_{xy} end{eqnarray} ここで、(C_{xy}=2E((X-mu_x)(Y-mu_y)))を共分散としている。 (V(X+Y))は、(V(X))と(V(Y))の和に(C_{xy})で補正をかけた値になっている。 では、(X)と(Y)が独立であるとなぜ(C_{xy}=0)になるのか。 (C_{xy})を式変形していくと以下のようになるが、 begin{eqnarray} frac{1}{2} C_{xy} &=& E((X-mu_x)(Y-mu_y)) \\ &=& E(XY)-mu_yE(X)-mu_xE(Y) +mu_x mu_y \\ &=& E(XY) -mu_x mu_y - mu_x mu_y + mu_x mu_y \\ end{eqnarray} (X)と(Y)が独立であると(E(XY)=E(X)E(Y)=mu_x mu_u)となるから、 begin{eqnarray} frac{1}{2} C_{xy} &=& E(XY) -mu_x mu_y - mu_x mu_y + mu_x mu_y \\ &=& mu_x mu_y-mu_x mu_y - mu_x mu_y + mu_x mu_y \\ &=& 0 end{eqnarray} こうやって、独立であるなら共分散がゼロといえる。 [arst_adsense slotnumber=\"1\"]
AirflowでEnd-To-End Pipeline Testsを行うためにAirflow APIを調べてみた話
Airflow自体にDAGの実行結果をテスト(End-To-End Pipeline Tests)する仕組みは無いようで、 以下のような地道な仕組みを自力で作る必要がありそうです。 テストデータを用意する Airflowが提供するAirflow APIを使用してDAGを実行する DAGの終了を待つ 結果をAssertする 他にAirflow CLIも使えそうですが、pythonコードの一部にするならAPIの方が使い勝手が良さそうです。 API仕様書を上から読んでみたので、その感想を書いてみます。 他にもあるのですが、今回の用途に使いそうなものを抜粋しています。 \"読んでみた\"だけなので、誤りがあるかもしれません。概要を理解するぐらいの気持ちで読んでください。 [arst_toc tag=\"h4\"] Airflow API概要 今日時点のAirflow APIのAPI仕様書は以下です。 Airflow API (Stable) (2.10.0) RESTful APIとなっていて、Resourceに対するCRUDをHTTP Methodで表現します。 1つ、update_maskという考え方があります。リソースの値を更新する際、リソースjsonと同時に クエリパラメタで\"変更したい値は何か\"を渡すことで、リソースjsonの該当値のみを更新できます。 resource = request.get(\'/resource/my-id\').json() resource[\'my_field\'] = \'new-value\' request.patch(\'/resource/my-id?update_mask=my_field\', data=json.dumps(resource)) API Authenticationがusername/passwordで雑ですが、 DAGのis_pausedをtrueにするには、以下の通りpatchを叩くようです。 curl -X PATCH \'https://example.com/api/v1/dags/{dag_id}?update_mask=is_paused\' -H \'Content-Type: application/json\' --user \"username:password\" -d \'{ \"is_paused\": true }\' CORSを有効にする必要があります。Enabling CORS 様々なAPI認証が用意されています。API認証はAirflowのauth managerで管理されます。Authentication エラーはRFC7807準拠です。つまり、Unauthenticated、PermissionDenied、BadRequest、NotFound、MethodNotAllowed、NotAcceptable、AlreadyExistsが扱われます。Errors Connections ざっとAPIを眺めていきます。 まずはConnection。順当なCRUDです。patchでupdate_maskが使われます。 コードから一通りConnectionを触れそうです。 Testって何か調べてみました。 デフォルトでdisabledになっていますが、Airflow UI(Connections)から\"Test\"ボタンを押下できます。 Connectionと関連付けられたhookのtest_connection()メソッドを実行するようです。 これと同等の機能が動くようです。 Method Endpoint Overview Response GET /connections List Connection array of objects(ConnectionCollectionItem). POST /connections Create a Connection created connection. GET /connections/{connection_id} Get a connection connection PATCH /connections/{connection_id} Update a connection updated connection DELETE /connections/{connection_id} Delete a connection (Status) POST /connections/test Test a connection (Status) DAG 次はDAG。まずDAG一覧に対する操作。一覧に対してpatchを叩ける様子です。 Method Endpoint Overview GET /dags List DAGs in the database. dag_id_pattern can be set to match dags of a specific pattern PATCH /dags Update DAGs of a given dag_id_pattern using UpdateMask. This endpoint allows specifying ~ as the dag_id_pattern to update all DAGs. New in version 2.3.0 次は個別のDAGに対する操作。 Method Endpoint Overview GET /dags/{dag_id} Get basic information about a DAG.Presents only information available in database (DAGModel). If you need detailed information, consider using GET /dags/{dag_id}/details. PATCH /dags/{dag_id} Update a DAG. DELETE /dags/{dag_id} Deletes all metadata related to the DAG, including finished DAG Runs and Tasks. Logs are not deleted. This action cannot be undone.New in version 2.2.0 GET /dags/{dag_id}/tasks/detail Get simplified representation of a task. GET /dags/{dag_id}/detail Get a simplified representation of DAG.The response contains many DAG attributes, so the response can be large. If possible, consider using GET /dags/{dag_id}. Airflowにおいて、Operatorのインスタンスに\"Task\"という用語が割り当てられています。 つまり、「Operatorに定義した処理を実際に実行すること」が\"Task\"としてモデリングされています。 「\"Task\"をA月B日X時Y分Z秒に実行すること」が、\"TaskInstance\"としてモデリングされています。 あるDAGは、実行日/実行時間ごとの複数の\"TaskInstance\"を保持しています。 以下のAPIにおいて、DAGが保持する\"Task\",\"日付レンジ\"等を指定して実行します。 \"TaskInstance\"を\"Clear(再実行)\"します。また、\"TaskInstance\"の状態を一気に更新します。 Method Endpoint Overview POST /dags/{dag_id}/clearTaskInstances Clears a set of task instances associated with the DAG for a specified date range. POST /dags/{dag_id}/updateTaskInstancesState Updates the state for multiple task instances simultaneously. GET /dags/{dag_id}/tasks Get tasks for DAG. なんだこれ、ソースコードを取得できるらしいです。 Method Endpoint Overview GET /dagSources/{file_token} Get a source code using file token. DAGRun \"Task\"と\"TaskInstance\"の関係と同様に\"DAG\"と\"DAGRun\"が関係しています。 「A月B日X時Y分Z秒のDAG実行」が\"DAGRun\"です。DAGRun。順当な感じです。 新規にトリガしたり、既存のDAGRunを取得して更新したり削除したり、再実行したりできます。 Method Endpoint Overview GET /dags/{dag_id}/dagRuns List DAG runs.This endpoint allows specifying ~ as the dag_id to retrieve DAG runs for all DAGs. POST /dags/{dag_id}/dagRuns Trigger a new DAG run.This will initiate a dagrun. If DAG is paused then dagrun state will remain queued, and the task won\'t run. POST /dags/~/dagRuns/list List DAG runs (batch).This endpoint is a POST to allow filtering across a large number of DAG IDs, where as a GET it would run in to maximum HTTP request URL length limit. GET /dags/{dag_id}/dagRuns/{dag_run_id} Get a DAG run. DELETE /dags/{dag_id}/dagRuns/{dag_run_id} Delete a DAG run. PATCH /dags/{dag_id}/dagRuns/{dag_run_id} Modify a DAG run.New in version 2.2.0 POST /dags/{dag_id}/dagRuns/{dag_run_id}/clear Clear a DAG run.New in version 2.4.0 以下はスキップ.. Method Endpoint Overview GET /dags/{dag_id}/dagRuns/{dag_run_id}/upstreamDatasetEvents Get datasets for a dag run.New in version 2.4.0 PATCH /dags/{dag_id}/dagRuns/{dag_run_id}/setNote Update the manual user note of a DagRun.New in version 2.5.0 DAGWarning DAGのimport_errors一覧を返します。 Method Endpoint Overview GET /dagWarnings List Dag Waranings. DAGStats A DAG Run status is determined when the execution of the DAG is finished. The execution of the DAG depends on its containing tasks and their dependencies. The status is assigned to the DAG Run when all of the tasks are in the one of the terminal states (i.e. if there is no possible transition to another state) like success, failed or skipped. The DAG Run is having the status assigned based on the so-called “leaf nodes” or simply “leaves”. Leaf nodes are the tasks with no children. There are two possible terminal states for the DAG Run: success if all of the leaf nodes states are either success or skipped, failed if any of the leaf nodes state is either failed or upstream_failed. Method Endpoint Overview GET /dagStats List Dag statistics. ImportError Airflow Best PractiveのTesting a DagにDAGのテスト観点に関する記述が(サラッと)書かれています。 まず、DAGは普通のpythonコードなので、pythonインタプリタで実行する際にエラーが起きないことを確認すべし、とのことです。 以下の実行により、未解決の依存関係、文法エラーをチェックします。もちろん、どこで実行するかが重要なので、DAG実行環境と合わせる必要があります。 Airflow APIにより、このレベルのエラーがDAGファイルにあるか確認できるようです。 $ python your-dag-file.py Method Endpoint Overview GET /importErrors List import errors. GET /importErrors/{import_error_id} Get an import error. Variables DAGに記述したくないCredentials等を管理する仕組みで、Airflow UIからポチポチ操作すると作れます。 Variableはkey-valueそのままです。DAGからkeyを指定することで参照できます。 Airflow APIからもVariableをCRUDできます。 Method Endpoint Overview GET /variables List variables.The collection does not contain data. To get data, you must get a single entity. POST /variables Create a variable. GET /variables/{variable_key} Get a variable by key. PATCH /variables/{variable_key} Update a variable by key. DELETE /variables/{variable_key} Delete a variable by key. まとめ RESTfulAPIが用意されているということは、内部のオブジェクトをCRUD出来るということなのだろう、 という推測のもと、Airflow APIのAPI仕様書を読んで感想を書いてみました。 Airflowの概念と対応するリソースはAPIに出現していて、End-To-End Pipeline Testを書く際に、Assert、実行制御を記述できそうな気持ちになりました。 Assert、実行制御、だけなら、こんなに要らない気もします。 API呼び出し自体の煩雑さがあり、Testの記述量が増えてしまうかもしれません。 以下の記事のようにwrapperを書く必要があるかもしれません。 https://github.com/chandulal/airflow-testing/blob/master/src/integrationtest/python/airflow_api.py DAGの入力側/出力側Endに対するファイル入出力は別で解決が必要そうです。 「API仕様書を読んでみた」の次の記事が書けるときになったら、再度まとめ記事を書いてみようと思います。
Azure Functionsの機能まとめ(座学版)
タイトルの通り、Azure Functionsの機能をまとめてみた。 [arst_toc tag=\"h4\"] 課金モデル 課金モデルが5パターンあるのではなく、運用方式が5パターンあり、それぞれ課金方式が違う。 呼称がMECEでなかったり公式ドキュメントで表記揺れが存在したり親切でない点はある。 Premium、DedicatedはApp Service Planで動かすことができ、かなり微妙に繋がっている。 実質的にPremium、DedicatedはApp Service Planで実現され課金がかかる。 コールドスタートに対する改善の歴史を感じる。 課金モデル 概要 従量課金 オーソドックスなFaaS。名前の通り資源の使用量に応じて課金。必要最低限のネットワーク分離が提供される。既存VNetとの統合は不可。コールドスタート。アプリのロード・アンロードが頻繁に発生し、しばしば遅い。 Premium 資源の使用量に応じて課金。従量課金よりも高機能な従量課金(言葉辛い..)。既存VNetとの統合がサポートされる。コールドスタートを回避するために用意された。インスタンス数をゼロまでスケールインさせないことでホットスタートを実現している。アクティブなインスタンスのコア数(vCPU/h)、メモリ使用量(GB/h)に課金。裏側はApp Service Planだが手持ちのカスタムイメージをACRに登録しApp Serviceにホストすることが可能。 Dedicated 通常のApp Service Planとして課金される。既にApp Serviceインスタンスを実行しており新たにFunctionを相乗りさせる時に使用する。従量課金的な要素が無いので(高価だけれども)コストを予測できる。 App Service Environment(ASE) 超強力なDedicated。1人の顧客に限定された専用環境。ASE v1,v2,v3と脈々と新しい奴が作られている。高スケール、分離およびセキュリティで保護されたネットワーク アクセス、高いメモリ使用率などが書かれている。マルチリージョンにまたがって構成できる。高RPS(Requests per Seconds)ワークロード向けに用意されるApp Serviceの強化版。 Container Apps Hosting Azure Container Appsでコンテナ化されたFunctionsの開発・デプロイ・管理。Kubernetes ベースの環境で関数を実行できる。現在プレビュー。 従量課金とPremiumの違い リッチな従量課金プランであるPremiumについて詳細なドキュメントがある。 Azure Functions の Premium プラン そのメリットとして、以下が列挙されている。 インスタンスをウォーム状態に維持することでコールド スタートを回避します 仮想ネットワーク接続 より長いランタイム期間をサポートします Premium インスタンス サイズの選択 従量課金プランと比較して、予測可能な料金 複数の Function App を含むプランでの高密度アプリ割り当て 従量課金プランは、インスタンス数をゼロまでスケールインできる。 その結果としてその料金の料金はかからない一方、リクエストが来たときにゼロから1個以上まで スケールアウトする際に\"コールドスタート\"時間を要する。 Premiumプランには、\"常時使用可能なインスタンス\"という考え方がある。 要はインスタンス数をゼロまでスケールインさせず、常にアクティブにしておくということらしい。 当然、\"常時使用可能なインスタンス\"は常時課金される。 他に\"事前ウォーミング可能なインスタンス\"という考え方がある。 常時使用可能なインスタンスが負荷分散してリクエストを捌いている間、 事前ウォーミング可能なインスタンスが後で立ち上がる。常時使用可能なインスタンスの負荷が 規定値を超えると、事前ウォーミング可能なインスタンスがアクティブに昇格し捌き始める。 事前ウォーミング可能なインスタンスは昇格するまでの間立派に課金されてしまう。 Premiumプランは実際はApp Serviceの仕組みで動く。 プラン名に規約がありEで始めるとElastic Premium、つまり、App Serviceで動かすPremiumということになる。また、Pで始めると動的スケールしないDedicated Hostingプランということになる。 Azure Functions は Azure App Service プラットフォームで実行できます。 App Service プラットフォームでは、Premium プラン関数アプリをホストするプランは Elastic Premium プランと呼ばれており、EP1 のような SKU 名があります。 Premium プランで関数アプリを実行することを選択した場合、EP1 のように \"E\" で始まる SKU 名を持つプランを必ず作成してください。 P1V2 (Premium V2 Small プラン) のように \"P\" で始まる App Service プラン SKU 名は実際には Dedicated ホスティング プランです。 Dedicated であり、Elastic Premium ではないため、\"P\" で始まる SKU 名のプランは動的にスケールせず、コストが増えることがあります。 実行継続時間 従量課金プランは1回の実行の最大は10分。Premiumプランはデフォルトで最大30分。 ただし、Premiumプランの最大値は延長して無制限まで拡張できる。 プラットフォームのアップグレードにより、マネージド シャットダウンがトリガーされ、関数の実行が停止する可能性があります プラットフォームの停止により、処理されないシャットダウンが発生し、関数の実行が停止する可能性があります 新しい実行がない状態で 60 分経つと worker を停止するアイドル タイマーがあります スケールイン動作により、60 分後に worker のシャットダウンが発生する可能性があります スロットのスワップにより、スワップ中にソース スロットとターゲット スロットの実行が終了される可能性があります これはFunctionのタイムアウト期間であって、HTTPトリガーの応答にはAzure Load Balancerの タイムアウト期間(=230秒)が適用される。HTTPトリガで長時間処理を実現する場合、 Durable Functionで作るか、即時応答・非同期処理のパターンにすべきとのこと。 Function App タイムアウト期間 Durable Functions とは 実行時間の長い関数を使用しない HTTPトリガで長時間処理を実装するパターン 可能な限り、大きな関数は、連携して高速な応答を返す、より小さな関数セットにリファクタリングしてください。 たとえば、webhook または HTTP トリガー関数では、一定の時間内に確認応答が必要になる場合があります。webhook は通常、即座に応答を必要とします。 この HTTP トリガー ペイロードは、キュー トリガー関数によって処理されるキューに渡すことができます。 このアプローチを使用すると、実際の作業を遅らせて、即座に応答を返すことができます。 ネットワーク 既存のAzureリソースとAzure Functionsを連携する際に、どのように既存リソースと連携できるか、 各実現方式毎にやれることが決まっている。以下が参考になった。 Azure Functions のネットワーク オプション 特徴 従量課金 Premium Dedicated ASE 受信アクセス制限 ✅ ✅ ✅ ✅ プライベートエンドポイント ❌ ✅ ✅ ✅ 仮想ネットワークの統合 ❌ ✅ ✅ ✅ VNet Trigger(非HTTP) ❌ ✅ ✅ ✅ Hybrid接続 ❌ ✅ ✅ ✅ 送信IPの制限 ❌ ✅ ✅ ✅ 受信アクセス制限は、送信元のIPアドレスに対するAllow/Denyを設定する機能。 IPv4/v6のIPアドレスを直接指定するか、サービスエンドポイントを使用するVNetのサブネットを指定可。 より詳細な記述は、Azure App Service のアクセス制限を設定するを参照。 プライベートエンドポイントは、VNet内からプライベートエンドポイントを介したPrivateLink接続。 AWS VPCと異なり、Azure VNetはリソースの論理的なグルーピングに過ぎない、という側面があり、 通信を秘匿化したいという文脈でなくても、PrivateLinkを使って連携せざるを得ない事情がある。 プライベートエンドポイントのDNSはAzureが良しなに作ってくれる。 仮想ネットワークの統合(VNet統合)は、Azure Functionsを指定のVNetに論理的に配置するオプション。 これにより、FunctionからVNet内のリソースにアクセスできるようになる。 FunctionからVNet内リソースに対して送信呼び出しを行うために使われる。逆には使われない。 従量課金ではN.G.だがPremiumクラスの従量課金なら可能になる。これはメリット。 リージョン内であれば、VNet側にVirtual Network Gatewayは必要ないがリージョン間であれば必要。 Virtual Network Gatewayを必要とする場合、通信に大きな制約がかかる。 なお、Azure FunctionsをASEで運用する場合、FunctionはASE内に物理的に配置されるため、 論理的なVNet統合を行う必要はないとのこと。 トリガについては後述する。オーソドックスな従量課金モデルはHTTPトリガしかサポートしない。 Premium以降で他のトリガが解放される。 ハイブリッド通信は、Windowsで動作している従量課金以外の全てのFunctionについて、 他のネットワークのリソースにアクセスできる機能。Azure Relayという機能の1つ。 Windowsを使わないといけないため特殊な用途となる。省略。 トリガとバインド トリガーによりFunctionが発火し実行される。つまりトリガーにより関数の呼び出し方法を定義する。 トリガーとバインドについてはAzure Functions でのトリガーとバインドの概念が参考になる。 トリガーにはデータが紐付けられていて、呼び出しの際のペイロードとなる。 バインドとは、別のリソースを宣言的に接続する方法。入力バインド/出力バインドがある。 バインドからのデータは、Functionから見てパラメータとして利用できる。 Azure Functionsのバージョンにより対応可否が異なる。現在のバージョンはv4。 比較的マイナーと思われるものについて、割と昔出来ていたことが出来なくなったパターンが多い。 Kafka、RabbitMQは従量課金プランではサポートされない。 Typev1.xv2.x以降トリガー入力出力 Blob Storage✔✔✔✔✔ Cosmos DB✔✔✔✔✔ Azure Data Explorer✔✔✔ Azure SQL✔✔✔✔ Dapr✔✔✔✔ Event Grid✔✔✔✔ Event Hubs✔✔✔✔ HTTP✔✔✔✔ IoT Hub✔✔✔ Kafka✔✔✔ Mobile Apps✔✔✔ Notification Hubs✔✔ Queue Storage✔✔✔✔ Redis✔✔ Rabbit MQ✔✔✔ SendGrid✔✔✔ Service Bus✔✔✔✔ SignalR✔✔✔✔ Table Storage✔✔✔✔ Timer✔✔✔ Twillo✔✔✔ 例えば、HTTPトリガーとバインドの例は以下。 RESTfulAPI的にURLにペイロードを含めることができる。 (ドキュメントを見ても何が正解が分からないし、もちろんどこかに実行例がある訳でもない) ここで、リクエストパラメタが入力バインド、レスポンスが出力バインド、ということになる..(のかな)。 import logging import azure.functions as func @app.function_name(name=\"httpTrigger\") @app.route(route=\"products/{category:alpha}/{id:int?}\" auth_level=func.AuthLevel.ANONYMOUS) def main(req: func.HttpRequest) -> func.HttpResponse: category = req.route_params.get(\'category\') id = req.route_params.get(\'id\') message = f\"Category: {category}, ID: {id}\" return func.HttpResponse(message) こうしておくと、例えば以下のURLで定義したhttpTriggerを実行できる。 http://.azurewebsites.net/api/products/electronics/357 auth_levelは認可レベル。URLのリクエストに必要な認可キーを指定する。 ANNONYMOUSなら不要、FUNCTIONなら関数固有のAPIキー、ADMINならマスターキー(?)。 詳細はこちら。 まとめ Azureドキュメントを見ながらAzure Functionの概要をまとめてみた。 実装例が少なくまとまったドキュメントが少ない、という問題があり、 座学版の他に「やってみた」を繰り返す必要がありそう。
PostgreSQL スキーマをコピーする
スキーマをコピーする方法はない。 代わりに以下の方法で同じ効果を得る。 スキーマ名Aをスキーマ名Bに変更する スキーマ名Bの状態でpg_dumpする スキーマ名Bをスキーマ名Aに変更する スキーマ名Bを作成する pg_dumpしたファイルをリストアする Statementは以下の通り。 $ psql -U user -d dbname -c \'ALTER SCHEMA old_schema RENAME TO new_schema\' $ pg_dump -U user -n new_schema -f new_schema.sql dbname $ psql -U user -d dbname -c \'ALTER SCHEMA new_schema RENAME TO old_schema\' $ psql -U user -d dbname -c \'CREATE SCHEMA new_schema\' $ psql -U user -q -d dbname -f new_schema.sql $ rm new_schema.sql [arst_adsense slotnumber=\"1\"]
AWS常時SSL リダイレクトループしない.htaccessの書き方
HTTPSを強制するために .htaccess に細工をするのは有名。例えば以下のような書き方が王道。 RewriteEngine on RewriteCond %{HTTPS} off RewriteRule ^(.*)$ https://%{HTTP_HOST}%{REQUEST_URI} [R,L] これをそのままElasticBeanstalkにデプロイするとリダイレクトループが発生する。正確に書くとリバースプロキシ(ロードバランサ)が有効になっている場合にリダイレクトループが発生する。 原因 原因についてはココが詳しい。ざっくりまとめると、 ロードバランサが443へのアクセスを80へのアクセスに変換する .htaccess内の RewriteCond ${HTTPS} が永遠に on にならず、リダイレクトの度にRewriteRule が走ってしまう 元々のアクセスが https か http のどちらかが分かれば良いのだが、上記の挙動のせいで、https にリダイレクトしたとしても http からアクセスされたことになり、これが永遠に繰り返されてしまう。 解決策 (記事主さんが)無茶苦茶泥臭く挙動を追跡したところ、ロードバランサに到着した元のアクセスが http のときに限り、X-Forwarded-Proto というヘッダが付与され値が入るらしい。なので、X-Forwarded-Protoヘッダの内容を http か https かの判断基準にすれば良い、というのが基本的なアイデア。本人も言っているが、it\'s just an empiric result... である。 その .htaccess が以下 RewriteEngine On # Force HTTPS RewriteCond %{HTTP:X-Forwarded-Proto} !=https RewriteRule ^/?(.*) https://%{HTTP_HOST}/$1 [R,L] これを ElasticBeanstalkにデプロイすると見事に動作する。 [arst_adsense slotnumber=\"1\"] 開発環境との共存 開発もAWSで行っていればこれで良いのだがそうでない場合も多いと思う。 上記のAWS用.htaccessを非AWSな開発環境に持ってくると今度は RewriteCond %{HTTP:X-Forwarded-Proto} !https が常に真になり、リダイレクトループが発生する。 あっちが立てばこっちが立たない! いろいろ試行錯誤した結果、以下なら両立できた。(2017/7/8訂正) RewriteEngine On # Force HTTPS RewriteCond %{HTTPS} !=on RewriteCond %{HTTP:X-Forwarded-Proto} !=https RewriteRule ^/?(.*) https://%{HTTP_HOST}/$1 [R,L] 根拠となる X-Forwarded-Proto がとっても経験的!なので、いつの日か使えなくなる日が来るかもしれない。 [arst_adsense slotnumber=\"1\"]
Snowflake MCPサーバを試してみた
何周遅れか分からないが、Snowflake MCPサーバを試してみたのでアウトプットしてみる。 AI AgentはClaude Code。MCPの構築と接続設定自体をClaude Codeで行なった。 この記事で使用したMCPサーバは以下。いわゆる野良MCPサーバ。 [clink implicit=\"false\" url=\"https://github.com/isaacwasserman/mcp-snowflake-server\" imgurl=\"https://camo.githubusercontent.com/bdcfca988b369e51051c3201cedfc429354b0801a0c5d88aa3eb00ae37e7188b/68747470733a2f2f6d736565702e6e65742f70722f69736161637761737365726d616e2d6d63702d736e6f77666c616b652d7365727665722d62616467652e706e67\" title=\"Snowflake MCP Server\" excerpt=\"A Model Context Protocol (MCP) server implementation that provides database interaction with Snowflake. This server enables running SQL queries via tools and exposes data insights and schema context as resources.\"] [arst_toc tag=\"h4\"] 前提となる環境 Macにnode、uv、Claude Codeを導入済み。 # 諸々のバージョンは以下 $ sw_vers ProductName: macOS ProductVersion: 15.6 BuildVersion: 24G84 # nodeは導入済み $ node --version v24.4.1 # uvは導入済み $ uv --version 0.8.13 (ede75fe62 2025-08-21)0.8.13 (ede75fe62 2025-08-21) # Claude Codeは導入済み $ claude --version 1.0.89 (Claude Code) # 検証用ディレクトリの作成と移動。以降ここで検証を実施。 $ mkdir snowflake-mcp-server && cd $_ 環境構築 プロンプトとその回答は省略する。要件を伝え環境構築を実施してもらった。 Claudeに対するMCPの設定方法としてグローバルに設定する方法と、 プロジェクト個別に設定する方法があるが、当初グローバル設定の方法を案内された。 2回目のプロンプトでプロジェクト個別設定に変更してもらった。 最終的なファイル構成は以下の通り。 $ tree . . ├── mcp-snowflake-server │ ├── Dockerfile │ ├── example_connections.toml │ ├── LICENSE │ ├── pyproject.toml │ ├── README.md │ ├── runtime_config.json │ ├── src │ │ └── mcp_snowflake_server │ │ ├── __init__.py │ │ ├── db_client.py │ │ ├── serialization.py │ │ ├── server.py │ │ └── write_detector.py │ └── uv.lock ├── PROJECT_SPECIFIC_SETUP.md ├── SETUP.md ├── snowflake_connections.toml └── test_server.py snowflake_connections.toml に Snowflakeの接続設定を書くだけで終了。 試してみた claudeを起動すると .mcp.json が認識された。 $ claude ╭─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮ │ │ │ New MCP server found in .mcp.json: snowflake │ │ │ │ MCP servers may execute code or access system resources. All tool calls require approval. Learn more in the MCP documentation │ │ (https://docs.anthropic.com/s/claude-code-mcp). │ │ │ │ ❯ 1. Use this and all future MCP servers in this project │ │ 2. Use this MCP server │ │ 3. Continue without using this MCP server │ │ │ ╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ > Snowflakeのデータベース `sample_db` におけるスキーマの一覧を教えてください。 ⏺ snowflake - list_schemas (MCP)(database: \"sample_db\") ⎿ type: data data_id: e40f8385-e3c0-4848-aa36-c1b8fada788f database: sample_db … +8 lines (ctrl+r to expand) ⎿ [Resource from snowflake at data://e40f8385-e3c0-4848-aa36-c1b8fada788f] { \"type\": \"data\", \"data_id\": \"e40f8385-e3c0-4848-aa36-c1b8fada788f\", … +25 lines (ctrl+r to expand) ⏺ sample_db データベースには以下の7つのスキーマがあります: - CLEANSED - INTERMEDIATE - PUBLIC - SAMPLE_SCHEMA > cleansedスキーマ内のテーブルの一覧を教えてください。 ⏺ CLEANSED スキーマには以下の19のテーブルがあります: ... メタ情報(description) dbtでモデルを作る際にdescriptionを付与することで、テーブル、カラムにdescriptionを付与できる。 その先、セマンティックレイヤーの存在がチラつくが、descriptionだけでもそこそこいける。 まとめ Claude Codeにローカル起動型(Dockerfile)のSnowflake MCPサーバを導入してみた。 その際、Claude Code自身にMCPサーバの導入を依頼し、ほぼノーコードで導入できることを確認した。 テーブル、カラムにdescriptionを付与しておくと、MCPサーバを通じて取得・消費される。 クエリによって得られる知識がClaudeに蓄積され、知識の文脈で会話ができるようになることを確認した。
Snowpark Container Services上でWebアプリ(FastAPI/React/TypeScript)を動かしてみた
シンプルな Multi-Container App を動かしている以下の記事にインスパイアされてみました。 以下の記事では、Docker networkを前提にフロントがサーバの名前解決を行っています。 これをデプロイすると、ブラウザで動くフロントコードがサーバの名前を解決できません(SPCS無関係)。 リバースプロキシを挟んでプライベートなダウンストリームにAPIを配置する方法が良さそうです。 今回の記事はSPCSの動作確認をすることが目的なので凝ったことはせず、 ViteをそのままデプロイしてProxyで解決してみたのでご紹介します。 [clink implicit=\"false\" url=\"https://medium.com/@maseedilyas9848/snowflake-container-mastery-step-by-step-deployment-of-your-multi-container-app-with-snowpark-211682514851\" imgurl=\"https://miro.medium.com/v2/resize:fit:1400/format:webp/1*t7s-Rl6F4BBV-yYs-ovODQ.png\" title=\"Snowflake Container Mastery: Step-by-Step Deployment of Your Multi-Container App with Snowpark Container Services\" excerpt=\"The buzz around town is all about Snowflake’s latest product feature, “Snowpark Container Services” and the excitement is real. Now, with the feature hitting public preview in various AWS regions, this blog dives into the nitty-gritty of what container services bring to the table. Join me as we explore what makes this feature tick and unravel the steps to deploy a multi-container app within Snowflake. Let’s break it down!\"] [arst_toc tag=\"h4\"] SPCSのアーキテクチャ Image Registryはイメージリポジトリです。AWSのECR、AzureのACRと似た感じで操作できます。 Container Serviceはデプロイメントの単位で、1個以上のコンテナをデプロイできます。 Compute Poolは計算資源で、複数のContainer Serviceが共有します。 Serviceは基本的にはPrivateなリソースとなりますが、SpecでEndpointsを定義することで、 Publicリソースとすることができます。自動的に80にmapされます。 Serviceには以下のフォーマットでDNS名が付きます。 Service同士は以下のDNS使ってプライベート通信できます。 同一DB、Schema内に作成したServiceは先頭のServiceNameが異なるだけとなりますが、 その場合に限り、ServiceNameだけで互いの名前解決ができます。 Service間連携については、公式で\"Service-to-Service\"というパターンで紹介されています。 <Service Name>-<Schema Name>-<DB Name>.snowflakecomputing.internal 1つのService内に複数のコンテナを配置することができます。 同一Service内で各コンテナ内の通信したいと考えたのですが、方法を見つけることができませんでした。 (出来ないはずはなさそうで方法はあるのかもしれません..) 今回作るもの フロント側、サーバ側の2つのコンテナを、それぞれ別々のServiceとしてデプロイします。 フロント側をPublic、サーバ側はPrivateとします。概要は以下の通りです。 フロント側 Vite React/TypeScript ReactからのAPIリクエストは一旦ViteのProxyで受けて、ProxyからAPIに流す サーバ側にGETリクエストして応答を表示するだけ サーバ側 uvicorn FastAPI poetry GETリクエストを受けて\"Hello World\"をJSONで返すだけ 思いっきり開発用な感じですが、SPCSの動作確認が目的ですのでこれでいきます。 ローカルで動作確認をして、SPCSにデプロイします。ファイル構成は以下の通りです。 $ tree . -I node_modules -I __pycache__ . ├── api │ ├── Dockerfile │ ├── api.py │ ├── app.py │ ├── main.py │ └── pyproject.toml ├── compose.yml └── front ├── Dockerfile ├── entry.sh ├── index.html ├── package-lock.json ├── package.json ├── src │ └── hello.tsx ├── tsconfig.json ├── tsconfig.node.json └── vite.config.ts compose.yml サーバ側、フロント側の2つのコンテナが、サーバ側->フロント側の順に起動するように書きます。 それぞれ、8080、5173 で待つようにします。 services: api: build: api volumes: - ./api:/app ports: - 8080:8080 front: build: front volumes: - ./front:/app ports: - 5173:5173 depends_on: - api サーバ側 Snowflakeの公式のチュートリアルだとFlaskが使われています。 今回は、最近使い始めたFastAPIを使ってAPIサーバを立ててみようと思います。 FlaskとFastAPIの比較はこちらが詳しいです。 FastAPIの特徴はPydanticによるデータ検証とAsyncI/O。TypeScriptのように型チェックできます。 パッケージマネージャにはpoetryを使います。デファクトが無いPythonのパッケージ管理界隈で npmやcomposer的な使い勝手が提供されます。 FastAPIの公式では、Python用Webサーバのuvicornを使ってホストされています。 uvicornでFastAPIを動かすコンテナを1個立てていきます。 Dockerfile 最新のPythonのイメージにpoetryをインストールします。 公式がインストーラを配布していて公式の手順通りに叩けばインストールできます。 公式のガイドの通り、POETRY_HOME=/etc/poetry とします。 Installation with the official installer FROM python:3.12 # 公式の通り /etc/poetry にインストールする ENV POETRY_HOME=/etc/poetry RUN curl -sSL https://install.python-poetry.org | python - ENV PATH $POETRY_HOME/bin:$PATH WORKDIR /app COPY . . RUN poetry install CMD [\"python\",\"main.py\"] pyproject.toml バニラのPythonとpyproject.tomlだけで依存関係を考慮したパッケージ管理が出来ますが、 要はnpmやbundle,composer的な使い勝手に寄せたパッケージ管理に対する需要があります。 poetry用の依存関係を書いていきます。fastapi、uvicornの現在(2/16)の最新を指定します。 [tool.poetry] name = \"test\" [tool.poetry.dependencies] python = \"^3.12\" fastapi = \"^0.109.2\" uvicorn = \"^0.27.1\" api.py [GET] /hello に対して Hello World 的な JSON を返す FastAPI の Hello Worldです。 後のapp.pyで FastAPIインスタンスに紐付けます。app.pyと分離することでロジックを分離できます。 from fastapi import APIRouter router = APIRouter() @router.get(\"/hello\") async def hoge(): return {\"result\":\"Hello World\"} app.py FastAPIのHello Worldコードです。Dockerfileから開始します。 from api import router as api_router from fastapi import FastAPI app = FastAPI() app.include_router(api_router, prefix=\"/api\") main.py pythonコードからuvicornを起動します。uvicorn.runの公式の仕様はこちら。 第1引数の\"app:app\"は\"app\"モジュールの中の\"app\"オブジェクトという表記です。 app.pyに記述したFastAPI()のインスタンスappを指します。stringで渡す必要があります。 hostは\"0.0.0.0\"を指定します。なぜ\"127.0.0.1\"でないのかはこちらが参考になります。 今回はPort=8080で起動します。reload=Trueとすると、HotReload機能が有効になります。便利。 import uvicorn if __name__ == \"__main__\": uvicorn.run( \"app:app\", host=\"0.0.0.0\", port=8080, reload=True, ) 起動してみる docker compose up して http://localhost:8080/docs を開くと以下が表示されます。 ちゃんとJSONでHello Worldが戻りました。 フロント側 VueとReactの開発用に使われるローカル開発用のサーバ Vite をホストするコンテナを立てます。 Viteは Vue.jsの開発者Evan You氏が開発したJavaScript/TypeScriptで、ヴィートと読み、 フランス語で\"素早い\"という意味だそう。(webpackのように)リソースバンドルが不要で起動が速い。 (Laravelも9.xでwebpackを捨ててViteになってた..) 素の状態でTypeScriptを扱えるため、すぐにTypeScriptを書き始められる特徴があります。 Dockerfile nodeのrelease scheduleはこちら。 2/17のnodeのActiveLTSのMajor Versionは20で、2026-04-30がEnd of lifeとなっています。 これを使いたいので node:20 を指定します。 FROM node:20 WORKDIR /app COPY . . RUN npm install ENTRYPOINT [ \"./entry.sh\" ] entry.sh Dockerfile の ENTRYPOINT で npm run dev するだけのshです。 #!/bin/bash npm run dev package.json npm create vite@latest で Prjディレクトリ内に様々なファイルが作られます。 package.jsonも作られます。 Hello World で必要なもの以外を削ってみました。 npm install後、npm run devで viteを実行します。 TS用のconfigは別です。 他に生成されるpackage-lock.jsonが必要ですが省略します。 { \"name\": \"front\", \"private\": true, \"version\": \"0.0.0\", \"scripts\": { \"dev\": \"vite\" }, \"dependencies\": { \"react\": \"^18.2.0\", \"react-dom\": \"^18.2.0\" }, \"devDependencies\": { \"@vitejs/plugin-react\": \"^4.2.1\" } } tsconfig.json viteのPrj生成で自動生成されるTS用のconfigファイルです。 手をつけずに配置します。 { \"compilerOptions\": { \"target\": \"ES2021\", \"useDefineForClassFields\": true, \"lib\": [\"ES2021\", \"DOM\", \"DOM.Iterable\"], \"module\": \"ESNext\", \"skipLibCheck\": true, /* Bundler mode */ \"moduleResolution\": \"bundler\", \"allowImportingTsExtensions\": true, \"resolveJsonModule\": true, \"isolatedModules\": true, \"noEmit\": true, \"jsx\": \"react-jsx\", /* Linting */ \"strict\": true, \"noUnusedLocals\": true, \"noUnusedParameters\": true, \"noFallthroughCasesInSwitch\": true }, \"include\": [\"src\"], \"references\": [{ \"path\": \"./tsconfig.node.json\" }] } tsconfig.node.json viteのPrj生成で自動生成されるTS用のconfigファイルです。 手をつけずに配置します。 { \"compilerOptions\": { \"composite\": true, \"skipLibCheck\": true, \"module\": \"ESNext\", \"moduleResolution\": \"bundler\", \"allowSyntheticDefaultImports\": true }, \"include\": [\"vite.config.ts\"] } vite.config.ts viteのPrj生成で自動生成されるvite用のconfigファイルです。 設定ファイルが.tsなところが凄いです。普通にimport文を書けます。 上のuvicornの起動で127.0.0.1ではなく0.0.0.0を指定したのと同様に、 viteも127.0.0.1ではなく0.0.0.0で待たせる必要があります。 serverオプションのhostにtrueを設定すると、0.0.0.0となります。公式 FastAPIの同一パスと対応するProxyを設定します。 以下で、server.proxy.api.target は SPCS上のAPIコンテナのPrivateエンドポイント を表します。 DNS名はサービス単位で作られます。本来長いFQDNを指定する必要がありますが、 同一スキーマに作られたサービスに限り、サービス名だけで解決できるようです。 DNS名はアンダースコア(_)がハイフン(-)に置き換わります。6時間くらいハマりました.. 後で ikuty_api_service サービスを作りますが、ikuty-api-serviceを 使います。 詳細は以下を参照してください。 Service-to-service communications import { defineConfig } from \'vite\' import react from \'@vitejs/plugin-react\' export default defineConfig({ plugins: [react()], server: { host: true, proxy: { \"/api\": { target: `http://ikuty-api-service:8080/`, changeOrigin: true } } }, }) index.html Reactのコンポーネントを表示するガワとなるhtmlです。 <!DOCTYPE html> <html> <head> <meta charset=\"UTF-8\" /> <meta name=\"viewport\" content=\"width=device-width, initial-scale=1.0\" /> <script type=\"module\" src=\"/src/hello.tsx\" defer></script> </head> <body> <div id=\"root\">Waiting...</div> </body> </html> src/hello.tsx ようやくHello WorldするReactコンポーネントの本体です。 画面にはvalueというStateを表示しています。APIのURLは上記の通りproxyとします。 今回作成した/api/hello APIの応答を受けた後、setValueによりStateを更新します。 import React from \'react\' import { useState } from \'react\' import ReactDOM from \'react-dom/client\' const App = () => { const [value, setValue] = useState(\'\') const url = \'/api/hello\' fetch(url,{}) .then(res=>res.json()) .then(data=>setValue(data[\'result\'])) return ( {value},{} ) } ReactDOM.createRoot(document.getElementById(\'root\')!).render( ) 起動してみる docker compose up すると、ほとんど一瞬でviteが起動します。 http://localhost:5173 を開きます。 Waiting...という表示が一瞬で Hello World に書き変わります。 ロールの作成 SPCSの各リソースの作成に必要な権限はこちらにあります。 ゴリ押ししただけなので間違っている可能性大です.. 行ったり来たりしたので足りないものがあるかもしれません。 use role ACCOUNTADMIN; CREATE ROLE IKUTY_CONTAINER_USER_ROLE; GRANT ROLE IKUTY_CONTAINER_USER_ROLE TO ROLE ACCOUNTADMIN; GRANT USAGE ON DATABASE IKUTY_DB TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT USAGE ON SCHEMA IKUTY_DB.PUBLIC TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT CREATE IMAGE REPOSITORY ON SCHEMA T_IKUTA_DB.PUBLIC TO ROLE IKUTY_CONTAINER_USER_ROLE; -- CREATE SERVICEに必要な権限 -- https://docs.snowflake.com/en/sql-reference/sql/create-service#access-control-requirements GRANT USAGE ON DATABASE IKUTY_DB TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT USAGE ON SCHEMA IKUTY_DB.PUBLIC TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT CREATE COMPUTE POOL ON ACCOUNT TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT CREATE IMAGE REPOSITORY ON SCHEMA IKUTY_DB.PUBLIC TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT CREATE SERVICE ON SCHEMA IKUTY_DB.PUBLIC TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT USAGE ON COMPUTE POOL IKUTY_SCS_POOL TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT IMPORTED PRIVILEGES ON DATABASE snowflake TO ROLE IKUTY_CONTAINER_USER_ROLE; -- GRANT READ ON STAGE IKUTY_SCS_STAGE TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT READ ON IMAGE REPOSITORY IKUTY_SCS_REPOSITORY TO ROLE IKUTY_CONTAINER_USER_ROLE; GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE IKUTY_CONTAINER_USER_ROLE; Image Repositoryの作成 SPCSで使用するイメージを配置するリポジトリを作成します。 AWSのECR、AzureのACR的に、dockerコマンドから透過的にpushできるようです。 公式は以下。 CREATE IMAGE REPOSITORY USE ROLE IKUTY_CONTAINER_USER_ROLE; CREATE OR REPLACE IMAGE REPOSITORY IKUTY_SCS_REPOSITORY; SHOW IMAGE REPOSITORIES; SHOW IMAGEを叩くと repository_url が返ってます。 Image Repositoryにプッシュする 作成したImage Repositoryにローカルで作成したイメージをpushしていきます。 pushは指定されたタグを送信するという仕様のため、docker tagコマンドでイメージにタグを付けます。 docker tagの仕様はこちら。 ローカルで以下を行います。(サニタイズのため分かりづらいですが補完してください..) # タグをつける # docker tag $ docker tag app_front:latest /app_front:scs $ docker tag app_api:latest /app_api:scs # Snowflake Image Repositoryにログインする $ docker login -u Login Succeeded # イメージをpushする $ docker push /app_front:scs ... $ docker push /app_api:scs ... Compute Poolを作成する Compute Poolを作成します。 CREATE COMPUTE POOL CREATE COMPUTE POOL ikuty_scs_pool MIN_NODES = 1 MAX_NODES = 1 INSTANCE_FAMILY = CPU_X64_XS AUTO_RESUME = TRUE INITIALLY_SUSPENDED = FALSE AUTO_SUSPEND_SECS = 3600 ; 以下でCREATEしたCompute poolをDESCRIBEできます。 DESCRIBE COMPUTE POOL 自分の環境だと、CREATE COMPUTE POOLしてから15分ほどステータスがSTARTINGでした。 15分ぐらいして叩くとステータスがACTIVEに変わりました。(結構かかるイメージ) 以下、公式の実行例です。 DESCRIBE ikuty_scs_pool +-----------------------+--------+-----------+-----------+-----------------+--------------+----------+-------------------+-------------+--------------+------------+-------------------------------+-------------------------------+-------------------------------+--------------+---------+ | name | state | min_nodes | max_nodes | instance_family | num_services | num_jobs | auto_suspend_secs | auto_resume | active_nodes | idle_nodes | created_on | resumed_on | updated_on | owner | comment | |-----------------------+--------+-----------+-----------+-----------------+--------------+----------+-------------------+-------------+--------------+------------+-------------------------------+-------------------------------+-------------------------------+--------------+---------| | IKUTY_SCS_POOL | ACTIVE | 1 | 1 | CPU_X64_XS | 1 | 0 | 0 | false | 1 | 0 | 2023-05-01 11:42:20.323 -0700 | 2023-05-01 11:42:20.326 -0700 | 2023-08-27 17:35:52.761 -0700 | ACCOUNTADMIN | NULL | +-----------------------+--------+-----------+-----------+-----------------+--------------+----------+-------------------+-------------+--------------+------------+-------------------------------+-------------------------------+-------------------------------+--------------+---------+ Serviceの作成 フロント側、サーバ側の2つのServiceを作成していきます。 specについて、ステージにファイルを配置してそれを指定するスタイルのほかに、 以下のようにCREATE SERVICEに含めるスタイルがあるようです。 CREATE SERVICE フロント側のServiceは以下です。 CREATE SERVICE ikuty_api_service IN COMPUTE POOL ikuty_scs_pool FROM SPECIFICATION $$ spec: containers: - name: api-container image: endpoints: - name: api port: 8080 $$ ; サーバ側のServiceは以下です。 CREATE SERVICE ikuty_front_service IN COMPUTE POOL ikuty_scs_pool FROM SPECIFICATION $$ spec: containers: - name: front-container image: endpoints: - name: front port: 5173 public: true $$ ; SERVICE用のシステム関数 SaaSで動くコンテナの動作を確認するのは結構面倒なことなのかなと思います。 自分の操作に対してSaaS側で何が行われているのか知りたいことは結構あるのかなと思います。 SPCSには以下のコマンドがあるようです。 SYSTEM$GET_SERVICE_STATUS SYSTEM$GET_SERVICE_LOGS エンドポイントURLの取得 SHOW ENDPOINTS すると、Specで指定したpublicなendpointを得られました。 ingress_url に なんとかかんとか.snowflakecomputing.app というURLが入っています。 SHOW ENDPOINTS 動作確認 Computing poolのStatusがACTIVEになってから、エンドポイントURLをブラウザで開くと、 期待通り、Reactで作ったHello Worldアプリが表示されます。 SYSEM$GET_SERVICE_LOGS()でフロントサービスのログを覗くと、viteの起動ログが出ていました。 そうえいば 5173 を 80 に mapping する記述をどこにもしていないのですが、そうなっています。 > front@0.0.0 dev > vite VITE v4.5.2 ready in 314 ms ➜ Local: http://localhost:5173/ ➜ Network: http://10.244.1.3:5173/ ➜ Network: http://172.16.0.6:5173/ 同様に、サーバ側のログを覗くと、uvicornの起動ログが出ていました。 ViteのProxyから8080で繋がるので、こちらは8080が開いています。 INFO: Will watch for changes in these directories: [\'/app\'] INFO: Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit) INFO: Started reloader process [1] using StatReload INFO: Started server process [8] INFO: Waiting for application startup. INFO: Application startup complete. まとめ SnowflakeをWebサーバのインフラにするだけの内容で正直意味がないです。 しかし、APIでSnowflakeに触ったり、Reactで格好良い可視化をしたり、夢は広がります。 FastAPI,React,TypeScriptの恩恵ゼロなので、今後ちょっと凝ったものを作ってみます。
標本の標準偏差とルートnの法則
[mathjax] 平均(mu)、標準偏差(sigma)からなる母集団から標本を取り出したとき、 標本の平均は母集団の平均(mu)に収束する。 では、もう一つの統計量である標準偏差はどうか。 意外と簡単にわかるようなのでまとめてみる。 誤差伝播法則 まず、下準備として、加法の誤差の見積もりについて。 今、(M_1)というサンプルが誤差(epsilon_1)、(M_2)というサンプルが誤差(epsilon_2)を持つとする。 つまり、それぞれ(M_1pmepsilon_1)、(M_2pmepsilon_2)。 その上で、((M_1pmepsilon_1) pm (M_2pmepsilon_2) ) について誤差の項をどう見積れるか、という話。 例えば以下の関係があったとき、 begin{eqnarray} z &=& f(x,y) end{eqnarray} 以下とすると、 begin{eqnarray} x &=& x_0 pm e_x \\ y &=& y_0 pm e_y end{eqnarray} (z)は、以下のようになる。 begin{eqnarray} z &=& z_0 pm e_z end{eqnarray} ここで(e_z)は以下となる(公式)。 偏微分とか何年振りだよ..と、思うがなんとなく確率の式より把握しやすい。 begin{eqnarray} e_z = sqrt{left( frac{partial f}{partial x} right)^2 e_x^2 + left( frac{partial f}{partial y} right)^2 e_y^2} end{eqnarray} 最初のサンプルと誤差を上記に入れてみると、 begin{eqnarray} sigma &=& sqrt{left( frac{partial (M_1+M_2)}{partial M_1} epsilon_1 right)^2 + left( frac{partial (M_1+M_2)}{partial M_2} epsilon_2 right)^2} = sqrt{ epsilon_1^2 + epsilon_2^2 } end{eqnarray} 両辺2乗して、 begin{eqnarray} sigma^2 &=& left( frac{partial (M_1+M_2)}{partial M_1} epsilon_1 right)^2 + left( frac{partial (M_1+M_2)}{partial M_2} epsilon_2 right)^2 = epsilon_1^2 + epsilon_2^2 end{eqnarray} ここから一番最初に戻ると、 begin{eqnarray} (M_1 pm epsilon_1) pm (M_2 pm epsilon_2) end{eqnarray} 上の誤差伝播式から以下が導かれる。 誤差項は以下の通りとなる様子。 begin{eqnarray} (M_1 pm M_2 ) pm sqrt{( epsilon_1^2 + epsilon_2^2 )} end{eqnarray} [arst_adsense slotnumber=\"1\"] ルートnの法則 母集団から(N)個のサンプルを取り出したときの平均は以下の通り。 begin{eqnarray} bar{x} = frac{x_1+x_2+cdots+x_N}{N} end{eqnarray} どの(x_i)も同じ母集団から取り出したサンプルなので、 それぞれの標準偏差は以下の通り全て同じ。 begin{eqnarray} sigma_1 = sigma_2 = cdots = sigma_N = sigma end{eqnarray} (bar{x})は真の値に誤差を加算した値であるが、誤差項は誤差伝播法則から以下の通りとなる。 begin{eqnarray} sqrt{sigma_1^2 + sigma_2^2 + cdots + sigma_N^2} = sqrt{sigma^2 + sigma^2 + cdots + sigma^2} = sqrt{N}sigma end{eqnarray} サンプル1個あたりの誤差、つまり標準偏差は、 begin{eqnarray} frac{sqrt{N}sigma}{N} = frac{sigma}{sqrt{N}} end{eqnarray} まとめ 平均(mu)、標準偏差(sigma)からなる母集団から標本を取り出したとき、 標本の平均は母集団の平均(mu)と等しい。 標本の標準偏差は( frac{sigma}{sqrt{N}} )である。 特に、標準偏差が(1/sqrt{N})倍となり、母集団と比較してより狭い範囲に値が集中する。 [arst_adsense slotnumber=\"1\"]
Fellegi-Sunterモデルに基づく確率的名寄せパッケージ Splinkを試してみる
[mathjax] Record Linkage、Entity Recognitionなど、いわゆる「名寄せ」問題には、割とさまざまな解決策が 提案されている。その1つに確率論的な数学的背景を持つFellegi-Sunterモデルを実装したSplinkがある。 数学的背景について咀嚼して説明してくれているサイトはあまりなく、Splinkの公式ドキュメントが唯一 に近いソースだろうと思う。 今回は5万人の人物データを使って重複を除去するSplinkの主要な機能のサンプルを追ってみた。 数式を読んだところで実りは薄い、ということもあり、なるべく数式を読まずに読み進めたが、 モデルのパラメタ、予測結果を解釈する際にどうしても数式を読まないとわからない箇所があった。 特にベイズ統計について理解がかなり浅くて、正直なところ理解しきれてはいない。 Splinkの公式ドキュメントを読み進めていった際に気付いたメモを書いてみることにした。 [arst_toc tag=\"h4\"] 環境構築・準備 環境構築の実装部分は主な学習要素ではないのでClaude Codeでサクッと作ってみた。 まずはPlan modeで計画を立ててもらう。これぐらいだとほぼ言い値で使える。 DuckDB、Splinkを使う最小構成の環境を作ります。環境はDocker composeで構築します。計画を立ててください。 ⏺ DuckDB、Splinkを使うDocker Compose環境の計画を完了しました。 計画概要 1. 設計文書: .tmp/design.mdで要件、アーキテクチャ、技術仕様を定義 2. タスク管理: .tmp/task.mdで詳細な実装手順を3フェーズに分けて整理 主要コンポーネント - Jupyter Lab環境: DuckDB、Splink、必要なライブラリがプリインストール - データボリューム: サンプルデータ、notebook、出力用のディレクトリ - 最小構成: 開発・学習用途に特化した軽量環境 次のステップ 実装を開始する場合は、Phase 1から順番に進めます: 1. Docker設定ファイル作成 2. ディレクトリ構造構築 3. サンプル実装 4. テストと検証 計画の詳細は.tmp/design.mdと.tmp/task.mdをご確認ください。実装を開始しますか? 実際にファイルを作ってもらうと以下のような構成となる。 得られる中間成果物である docker-compose.yml、Dockerfile は本記事では省略する。 . ├── data ├── docker-compose.yml ├── Dockerfile ├── notebooks ├── output └── requirements.txt 普通に http://localhost:8888 で JupyterLab が開く。 使用するサンプルデータ 5万人の人物データを使って名寄せを行うサンプル。おそらくSplinkの用途として最初に思いつくやつ。 Splinkにデータをロードする前に必要なデータクリーニング手順について説明がある。 公式によると、まずは行に一意のIDを割り当てる必要がある。 データセット内で一意となるIDであって、重複除去した後のエンティティを識別するIDのことではない。 [clink implicit=\"false\" url=\"https://moj-analytical-services.github.io/splink/demos/tutorials/01_Prerequisites.html\" imgurl=\"https://user-images.githubusercontent.com/7570107/85285114-3969ac00-b488-11ea-88ff-5fca1b34af1f.png\" title=\"Data Prerequisites\" excerpt=\"Splink では、リンクする前にデータをクリーンアップし、行に一意の ID を割り当てる必要があります。このセクションでは、Splink にデータをロードする前に必要な追加のデータクリーニング手順について説明します。\"] 使用するサンプルデータは以下の通り。 from splink import splink_datasets df = splink_datasets.historical_50k df.head() データの分布を可視化 splink.exploratoryのprofile_columnsを使って分布を可視化してみる。 from splink import DuckDBAPI from splink.exploratory import profile_columns db_api = DuckDBAPI() profile_columns(df, db_api, column_expressions=[\"first_name\", \"substr(surname,1,2)\"]) 同じ姓・名の人が大量にいることがわかる。 ブロッキングとブロッキングルールの評価 テーブル内のレコードが他のレコードと「同一かどうか」を調べるためには、 基本的には、他のすべてのレコードとの何らかの比較操作を行うこととなる。 全てのレコードについて全てのカラム同士を比較したいのなら、 対象のテーブルをCROSS JOINした結果、各カラム同士を比較することとなる。 SELECT ... FROM input_tables as l CROSS JOIN input_tables as r あるカラムが条件に合わなければ、もうその先は見ても意味がない、 というケースは多い。例えば、まず first_name 、surname が同じでなければ、 その先の比較を行わない、というのはあり得る。 SELECT ... FROM input_tables as l INNER JOIN input_tables as r ON l.first_name = r.first_name AND l.surname = r.surname このような考え方をブロッキング、ON句の条件をブロッキングルールと言う。 ただ、これだと性と名が完全一致していないレコードが残らない。 そこで、ブロッキングルールを複数定義し、いずれかが真であれば残すことができる。 ここでポイントなのが、ブロッキングルールを複数定義したとき、 それぞれのブロッキングルールで重複して選ばれるレコードが発生した場合、 Splinkが自動的に排除してくれる。 このため、ブロッキングルールを重ねがけすると、最終的に残るレコード数は一致する。 ただ、順番により、同じルールで残るレコード数は変化する。 逆に言うと、ブロッキングルールを足すことで、重複除去後のOR条件が増えていく。 積算グラフにして、ブロッキングルールとその順番の効果を見ることができる。 from splink import DuckDBAPI, block_on from splink.blocking_analysis import ( cumulative_comparisons_to_be_scored_from_blocking_rules_chart, ) blocking_rules = [ block_on(\"substr(first_name,1,3)\", \"substr(surname,1,4)\"), block_on(\"surname\", \"dob\"), block_on(\"first_name\", \"dob\"), block_on(\"postcode_fake\", \"first_name\"), block_on(\"postcode_fake\", \"surname\"), block_on(\"dob\", \"birth_place\"), block_on(\"substr(postcode_fake,1,3)\", \"dob\"), block_on(\"substr(postcode_fake,1,3)\", \"first_name\"), block_on(\"substr(postcode_fake,1,3)\", \"surname\"), block_on(\"substr(first_name,1,2)\", \"substr(surname,1,2)\", \"substr(dob,1,4)\"), ] db_api = DuckDBAPI() cumulative_comparisons_to_be_scored_from_blocking_rules_chart( table_or_tables=df, blocking_rules=blocking_rules, db_api=db_api, link_type=\"dedupe_only\", ) 積算グラフは以下の通り。積み上がっている数値は「比較の数」。 要は、論理和で条件を足していって、次第に緩和されている様子がわかる。 DuckDBでは比較の数を2,000万件以内、Athena,Sparkでは1億件以内を目安にせよとのこと。 比較の定義 Splinkは Fellegi-Sunter model モデル (というかフレームワーク) に基づいている。 https://moj-analytical-services.github.io/splink/topic_guides/theory/fellegi_sunter.html 各カラムの同士をカラムの特性に応じた距離を使って比較し、重みを計算していく。 各カラムの比較に使うためのメソッドが予め用意されているので、特性に応じて選んでいく。 以下では、first_name, sur_name に ForenameSurnameComparison が使われている。 dobにDateOfBirthComparison、birth_place、ocupationにExactMatchが使われている。 import splink.comparison_library as cl from splink import Linker, SettingsCreator settings = SettingsCreator( link_type=\"dedupe_only\", blocking_rules_to_generate_predictions=blocking_rules, comparisons=[ cl.ForenameSurnameComparison( \"first_name\", \"surname\", forename_surname_concat_col_name=\"first_name_surname_concat\", ), cl.DateOfBirthComparison( \"dob\", input_is_string=True ), cl.PostcodeComparison(\"postcode_fake\"), cl.ExactMatch(\"birth_place\").configure(term_frequency_adjustments=True), cl.ExactMatch(\"occupation\").configure(term_frequency_adjustments=True), ], retain_intermediate_calculation_columns=True, ) # Needed to apply term frequencies to first+surname comparison df[\"first_name_surname_concat\"] = df[\"first_name\"] + \" \" + df[\"surname\"] linker = Linker(df, settings, db_api=db_api) ComparisonとComparison Level ここでSplinkツール内の比較の概念の説明。以下の通り概念に名前がついている。 Data Linking Model ├─-- Comparison: Date of birth │ ├─-- ComparisonLevel: Exact match │ ├─-- ComparisonLevel: One character difference │ ├─-- ComparisonLevel: All other ├─-- Comparison: First name │ ├─-- ComparisonLevel: Exact match on first_name │ ├─-- ComparisonLevel: first_names have JaroWinklerSimilarity > 0.95 │ ├─-- ComparisonLevel: first_names have JaroWinklerSimilarity > 0.8 │ ├─-- ComparisonLevel: All other モデルのパラメタ推定 モデルの実行に必要なパラメタは以下の3つ。Splinkを用いてパラメタを得る。 ちなみに u は \"\'U\'nmatch\"、m は \"\'M\'atch\"。背後の数式の説明で現れる。 No パラメタ 説明 1 無作為に選んだレコードが一致する確率 入力データからランダムに取得した2つのレコードが一致する確率 (通常は非常に小さい数値) 2 u値(u確率) 実際には一致しないレコードの中で各 ComparisonLevel に該当するレコードの割合。具体的には、レコード同士が同じエンティティを表すにも関わらず値が異なる確率。例えば、同じ人なのにレコードによって生年月日が違う確率。これは端的には「データ品質」を表す。名前であればタイプミス、別名、ニックネーム、ミドルネーム、結婚後の姓など。 3 m値(m確率) 実際に一致するレコードの中で各 ComparisonLevel に該当するレコードの割合。具体的には、レコード同士が異なるエンティティを表すにも関わらず値が同じである確率。例えば別人なのにレコードによって性・名が同じ確率 (同姓同名)。性別は男か女かしかないので別人でも50%の確率で一致してしまう。 無作為に選んだレコードが一致する確率 入力データからランダムに抽出した2つのレコードが一致する確率を求める。 値は0.000136。すべての可能なレコードのペア比較のうち7,362.31組に1組が一致すると予想される。 合計1,279,041,753組の比較が可能なため、一致するペアは合計で約173,728.33組になると予想される、 とのこと。 linker.training.estimate_probability_two_random_records_match( [ block_on(\"first_name\", \"surname\", \"dob\"), block_on(\"substr(first_name,1,2)\", \"surname\", \"substr(postcode_fake,1,2)\"), block_on(\"dob\", \"postcode_fake\"), ], recall=0.6, ) > Probability two random records match is estimated to be 0.000136. > This means that amongst all possible pairwise record comparisons, > one in 7,362.31 are expected to match. > With 1,279,041,753 total possible comparisons, > we expect a total of around 173,728.33 matching pairs u確率の推定 実際には一致しないレコードの中でComparisonの評価結果がPositiveである確率。 基本、無作為に抽出したレコードは一致しないため、「無作為に抽出したレコード」を 「実際には一致しないレコード」として扱える、という点がミソ。 probability_two_random_records_match によって得られた値を使ってu確率を求める。 estimate_u_using_random_sampling によって、ラベルなし、つまり教師なしでu確率を得られる。 レコードのペアをランダムでサンプルして上で定義したComparisonを評価する。 ランダムサンプルなので大量の不一致が発生するが、各Comparisonにおける不一致の分布を得ている。 これは、例えば性別について、50%が一致、50%が不一致である、という分布を得ている。 一方、例えば生年月日について、一致する確率は 1%、1 文字の違いがある確率は 3%、 その他はすべて 96% の確率で発生する、という分布を得ている。 linker.training.estimate_u_using_random_sampling(max_pairs=5e6) > ----- Estimating u probabilities using random sampling ----- > > Estimated u probabilities using random sampling > > Your model is not yet fully trained. Missing estimates for: > - first_name_surname (no m values are trained). > - dob (no m values are trained). > - postcode_fake (no m values are trained). > - birth_place (no m values are trained). > - occupation (no m values are trained). m確率の推定 「実際に一致するレコード」の中で、Comparisonの評価がNegativeになる確率。 そもそも、このモデルを使って名寄せ、つまり「一致するレコード」を見つけたいのだから、 モデルを作るために「実際に一致するレコード」を計算しなければならないのは矛盾では..となる。 無作為抽出結果から求められるu確率とは異なり、m確率を求めるのは難しい。 もしラベル付けされた「一致するレコード」、つまり教師データセットがあるのであれば、 そのデータセットを使ってm確率を求められる。 例えば、日本人全員にマイナンバーが振られて、全てのレコードにマイナンバーが振られている、 というアナザーワールドがあるのであれば、マイナンバーを使ってm確率を推定する。(どういう状況??) ラベル付けされたデータがないのであれば、EMアルゴリズムでm確率を求めることになっている。 EMアルゴリズムは反復的な手法で、メモリや収束速度の点でペア数を減らす必要があり、 例ではブロッキングルールを設定している。 以下のケースでは、first_nameとsurnameをブロッキングルールとしている。 つまり、first_name, surnameが完全に一致するレコードについてペア比較を行う。 この仮定を設定したため、first_name, surname (first_name_surname) のパラメタを推定できない。 training_blocking_rule = block_on(\"first_name\", \"surname\") training_session_names = ( linker.training.estimate_parameters_using_expectation_maximisation( training_blocking_rule, estimate_without_term_frequencies=True ) ) > ----- Starting EM training session ----- > > Estimating the m probabilities of the model by blocking on: > (l.\"first_name\" = r.\"first_name\") AND (l.\"surname\" = r.\"surname\") > > Parameter estimates will be made for the following comparison(s): > - dob > - postcode_fake > - birth_place > - occupation > > Parameter estimates cannot be made for the following comparison(s) since they are used in the blocking rules: > - first_name_surname > > Iteration 1: Largest change in params was 0.248 in probability_two_random_records_match > Iteration 2: Largest change in params was 0.0929 in probability_two_random_records_match > Iteration 3: Largest change in params was -0.0237 in the m_probability of birth_place, level `Exact match on > birth_place` > Iteration 4: Largest change in params was 0.00961 in the m_probability of birth_place, level `All other >comparisons` > Iteration 5: Largest change in params was -0.00457 in the m_probability of birth_place, level `Exact match on birth_place` > Iteration 6: Largest change in params was -0.00256 in the m_probability of birth_place, level `Exact match on birth_place` > Iteration 7: Largest change in params was 0.00171 in the m_probability of dob, level `Abs date difference Iteration 8: Largest change in params was 0.00115 in the m_probability of dob, level `Abs date difference Iteration 9: Largest change in params was 0.000759 in the m_probability of dob, level `Abs date difference Iteration 10: Largest change in params was 0.000498 in the m_probability of dob, level `Abs date difference Iteration 11: Largest change in params was 0.000326 in the m_probability of dob, level `Abs date difference Iteration 12: Largest change in params was 0.000213 in the m_probability of dob, level `Abs date difference Iteration 13: Largest change in params was 0.000139 in the m_probability of dob, level `Abs date difference Iteration 14: Largest change in params was 9.04e-05 in the m_probability of dob, level `Abs date difference <= 10 year` 同様にdobをブロッキングルールに設定して実行すると、dob以外の列についてパラメタを推定できる。 training_blocking_rule = block_on(\"dob\") training_session_dob = ( linker.training.estimate_parameters_using_expectation_maximisation( training_blocking_rule, estimate_without_term_frequencies=True ) ) > ----- Starting EM training session ----- > > Estimating the m probabilities of the model by blocking on: > l.\"dob\" = r.\"dob\" > > Parameter estimates will be made for the following comparison(s): > - first_name_surname > - postcode_fake > - birth_place > - occupation > > Parameter estimates cannot be made for the following comparison(s) since they are used in the blocking rules: > - dob > > Iteration 1: Largest change in params was -0.474 in the m_probability of first_name_surname, level `Exact match on first_name_surname_concat` > Iteration 2: Largest change in params was 0.052 in the m_probability of first_name_surname, level `All other comparisons` > Iteration 3: Largest change in params was 0.0174 in the m_probability of first_name_surname, level `All other comparisons` > Iteration 4: Largest change in params was 0.00532 in the m_probability of first_name_surname, level `All other comparisons` > Iteration 5: Largest change in params was 0.00165 in the m_probability of first_name_surname, level `All other comparisons` > Iteration 6: Largest change in params was 0.00052 in the m_probability of first_name_surname, level `All other comparisons` > Iteration 7: Largest change in params was 0.000165 in the m_probability of first_name_surname, level `All other comparisons` > Iteration 8: Largest change in params was 5.29e-05 in the m_probability of first_name_surname, level `All other comparisons` > > EM converged after 8 iterations > > Your model is not yet fully trained. Missing estimates for: > - first_name_surname (some u values are not trained). モデルパラメタの可視化 m確率、u確率の可視化。 マッチウェイトの可視化。マッチウェイトは (log_2 (m / u))で計算される。 linker.visualisations.match_weights_chart() モデルの保存と読み込み 以下でモデルを保存できる。 settings = linker.misc.save_model_to_json( \"./saved_model_from_demo.json\", overwrite=True ) 以下で保存したモデルを読み込める。 import json settings = json.load( open(\'./saved_model_from_demo.json\', \'r\') ) リンクするのに十分な情報が含まれていないレコード 「John Smith」のみを含み、他のすべてのフィールドがnullであるレコードは、 他のレコードにリンクされている可能性もあるが、潜在的なリンクを明確にするには十分な情報がない。 以下により可視化できる。 linker.evaluation.unlinkables_chart() 横軸は「マッチウェイトの閾値」。縦軸は「リンクするのに十分な情報が含まれないレコード」の割合。 マッチウェイト閾値=6.11ぐらいのところを見ると、入力データセットのレコードの約1.3%が リンクできないことが示唆される。 訓練済みモデルを使って未知データのマッチウェイトを予測 上で構築した推定モデルを使用し、どのペア比較が一致するかを予測する。 内部的には以下を行うとのこと。 blocking_rules_to_generate_predictionsの少なくとも1つと一致するペア比較を生成 Comparisonで指定されたルールを使用して、入力データの類似性を評価 推定された一致重みを使用し、要求に応じて用語頻度調整を適用して、最終的な一致重みと一致確率スコアを生成 df_predictions = linker.inference.predict(threshold_match_probability=0.2) df_predictions.as_pandas_dataframe(limit=1) > Blocking time: 0.88 seconds > Predict time: 1.91 seconds > > -- WARNING -- > You have called predict(), but there are some parameter estimates which have neither been estimated or > specified in your settings dictionary. To produce predictions the following untrained trained parameters will > use default values. > Comparison: \'first_name_surname\': > u values not fully trained records_to_plot = df_e.to_dict(orient=\"records\") linker.visualisations.waterfall_chart(records_to_plot, filter_nulls=False) predictしたマッチウェイトの可視化、数式との照合 predictしたマッチウェイトは、ウォーターフォール図で可視化できる。 マッチウェイトは、モデル内の各特徴量によって一致の証拠がどの程度提供されるかを示す中心的な指標。 (lambda)は無作為抽出した2つのレコードが一致する確率。(K=m/u)はベイズ因子。 begin{align} M &= log_2 ( frac{lambda}{1-lambda} ) + log_2 K \\ &= log_2 ( frac{lambda}{1-lambda} ) + log_2 m - log_2 u end{align} 異なる列の比較が互いに独立しているという仮定を置いていて、 2つのレコードのベイズ係数が各列比較のベイズ係数の積として扱う。 begin{eqnarray} K_{feature} = K_{first_name_surname} + K_{dob} + K_{postcode_fake} + K_{birth_place} + K_{occupation} + cdots end{eqnarray} マッチウェイトは以下の和。 begin{eqnarray} M_{observe} = M_{prior} + M_{feature} end{eqnarray} ここで begin{align} M_{prior} &= log_2 (frac{lambda}{1-lambda}) \\ M_{feature} &= M_{first_name_surname} + M_{dob} + M_{postcode_fake} + M_{birth_place} + M_{occupation} + cdots end{align} 以下のように書き換える。 begin{align} M_{observe} &= log_2 (frac{lambda}{1-lambda}) + sum_i^{feature} log_2 (frac{m_i}{u_i}) \\ &= log_2 (frac{lambda}{1-lambda}) + log_2 (prod_i^{feature} (frac{m_i}{u_i}) ) end{align} ウォーターフォール図の一番左、赤いバーは(M_{prior} = log_2 (frac{lambda}{1-lambda}))。 特徴に関する追加の知識が考慮されていない場合のマッチウェイト。 横に並んでいる薄い緑のバーは (M_{first_name_surname} + M_{dob} + M_{postcode_fake} + M_{birth_place} + M_{occupation} + cdots)。 各特徴量のマッチウェイト。 一番右の濃い緑のバーは2つのレコードの合計マッチウェイト。 begin{align} M_{feature} &= M_{first_name_surname} + M_{dob} + M_{postcode_fake} + M_{birth_place} + M_{occupation} + cdots \\ &= 8.50w end{align} まとめ 長くなったのでいったん終了。この記事では教師なし確率的名寄せパッケージSplinkを使用してモデルを作ってみた。 次の記事では、作ったモデルを使用して実際に名寄せをしてみる。 途中、DuckDBが楽しいことに気づいたので、DuckDBだけで何個か記事にしてみようと思う。
正規分布に従う確率変数の二乗和はカイ二乗分布に従うことの証明
[mathjax] 母平均(mu)、標準偏差(sigma)の正規分布から(n)個の標本を無作為抽出したとき、 (n)個の標本について二乗和(V)を計算した場合(V)はどのような分布をするか。 begin{eqnarray} V = x_1^2 + x_2^2 + cdots + x_n^2 end{eqnarray} (V)の分布は自由度nのカイ二乗分布になる。 なお、実際にデータを表示してみた記事は以下。 [clink url=\"https://ikuty.com/2019/08/12/chi-square-distribution_handson/\"] 証明の式変形が気持ち良いことで有名?なので1度やってみる。 証明が奇跡的だったのでまとめてみる 自由度(n)のカイ二乗分布の確率密度関数。 これでもかっ、というくらいにいろいろ乗っかってる。 begin{eqnarray} f_n(x) = frac{1}{2^{frac{n}{2}}Gamma({frac{n}{2}})}x^{frac{n}{2}-1}e^{-frac{x}{2}} end{eqnarray} 標準正規分布と同じ扱いで、 (x)に関する積分が1になるようにガンマ関数による定数項がついてる。 勢い以下のような見方になる。 begin{eqnarray} f_n(x) = left( frac{1}{2^{frac{n}{2}}Gamma({frac{n}{2}})} right) x^{frac{n}{2}-1} e^{-frac{x}{2}} end{eqnarray} [arst_adsense slotnumber=\"1\"] だから何なのか、と思うけども、一度は証明を見ておくと良し、という意見がある。 ド直球に、標準正規分布の確率密度関数から2乗和の分布を求めようとして、 奇跡的に上記の確率密度関数になってかなり面白かったのでまとめてみた。 (n=1)のときの証明 (X)が標準正規分布に従うときの確率密度関数は以下。 begin{eqnarray} f(x) = frac{1}{sqrt{2pi}}e^{-frac{x^2}{2}} end{eqnarray} このとき(X)の2乗の分布(Y=X^2)の分布を考えようとするとき、 (Yle y)となる確率(P(Yle y))は、 begin{eqnarray} P(Yle y) = P(-sqrt{y} le X le sqrt{y}) end{eqnarray} となるので、(Y)の確率分布関数(F(y))は、 begin{eqnarray} F(y) &=& int_{-sqrt{y}}^{sqrt{y}}f(x)dx \\ &=& 2 int_{0}^{sqrt{y}}f(x)dx end{eqnarray} (y=x^2)という変数変換をして微分すると、(frac{dy}{dx}=2x)から、(dy=2xdx=2sqrt{y}dx)。 これを使って書き直すと、(コレ考えたやつ頭おかしい...) begin{eqnarray} F(y) &=& 2int_{0}^{sqrt{y}}frac{1}{sqrt{2pi}}e^{-frac{x^2}{2}}dx \\ &=& 2 frac{1}{2} int_{0}^{sqrt{y}}frac{1}{sqrt{2pi}y}e^{-frac{x^2}{2}}dy \\ &=& int_{0}^{sqrt{y}}frac{1}{sqrt{2pi}y}e^{-frac{y}{2}}dy \\ &=& int_{0}^{sqrt{y}}frac{1}{2^{frac{1}{2}}sqrt{pi}}y^{-frac{1}{2}}e^{-frac{y}{2}}dy \\ end{eqnarray} ガンマ関数(Gamma(n))って何だっけ...、というところで力尽きた。 (Gamma(frac{1}{2}))だけ複素数にならず(sqrt{pi})になる。 (F(y))をガンマ関数を入れて書き直すと、 begin{eqnarray} F(y) = int_{0}^{sqrt{y}}frac{1}{2^{frac{1}{2}}Gamma(frac{1}{2})}y^{-frac{1}{2}}e^{-frac{y}{2}}dy \\ end{eqnarray} この式は奇跡的に(n=1)のとき、カイ二乗分布の確率密度関数になってる。 begin{eqnarray} f_1(x) &=& frac{1}{2^{frac{1}{2}}Gamma({frac{1}{2}})}x^{frac{1}{2}-1}e^{-frac{x}{2}} end{eqnarray} [arst_adsense slotnumber=\"1\"] (n ge 2)のときの証明 数学的帰納法で証明する。このワード、何年振りだろうか...。 Wikipediaによると、 数学的帰納法(すうがくてききのうほう、英: mathematical induction)は自然数に関する命題 P(n) が全ての自然数 n に対して成り立っている事を証明するための、次のような証明手法である。 P(1) が成り立つ事を示す。 任意の自然数 k に対して、「P(k) ⇒ P(k + 1)」が成り立つ事を示す。 以上の議論から任意の自然数 n について P(n) が成り立つ事を結論づける。 準備として、確率密度関数の畳み込みについて。 2つの確率変数(X_1)、(X_2)が互いに独立に標準正規分布に従い、 (Y_1=X_1^2)、(Y_2=X_2^2)とおいたとき、(Z=Y_1+Y_2)が従う確率密度関数を求める。 確率変数(Y_1)、(Y_2)双方とも、確率密度関数(h_1(x))に従うときは、 (x=y_1+y_2, y_1,y_2 ge 0, z ge 0)に注意して、 以下を計算することで確率変数(Z=Y_1+Y_2=X_1^2+X_2^2)が従う確率密度関数が求まる。 begin{eqnarray} h_2(x) = int_0^{z}h_1(y)h_1(z-y)dy end{eqnarray} (P(1))は既に示されている。任意の自然数 (n) に対して、「(P(n) ⇒ P(n + 1))」が成り立つ事を示す。 (Y=X_1^2+X_2^2+cdots+X_{n-1}^2)が自由度(n-1)のカイ二乗分布に従い、 (X_n^2)が自由度(1)のカイ二乗分布に従うとき、(Y+X_n)が自由度(n)のカイ二乗分布に従うことを示す。 示すのは以下。 begin{eqnarray} f_n(x) = int_{0}^{x}f_{n-1}(t)f_1(x-t)dt end{eqnarray} 右辺を展開していく。 begin{eqnarray} int_{0}^{x} frac{1}{2^{frac{n-1}{2}}Gamma(frac{n-1}{2})}t^{frac{n-3}{2}} e^{-frac{x}{2}} cdot frac{1}{2^{frac{1}{2}}Gamma({frac{1}{2})}}t^{-frac{1}{2}}e^{-frac{x}{2}} end{eqnarray} (t)に対する定数項を積分の外に出せる。 begin{eqnarray} frac{e^{-frac{x}{2}}}{2^{frac{n}{2}}Gamma(frac{n-1}{2})sqrt{pi}} int_{0}^{x}t^{frac{n-3}{2}}(x-t)^{-frac{1}{2}}dt end{eqnarray} ここで(u=frac{t}{x})とおくと、(frac{du}{dt}=frac{1}{x})だから、(dt=xdu)。 変数を置き換える。奇跡的に(x)が積分の外に出る。 begin{eqnarray} frac{e^{-frac{x}{2}}}{2^{frac{n}{2}}Gamma(frac{n-1}{2})sqrt{pi}} int_{0}^{1}(ux)^{frac{n-3}{2}}(x-ux)^{frac{1}{2}}xdu \\ = frac{e^{-frac{x}{2}}}{2^{frac{n}{2}}Gamma(frac{n-1}{2})sqrt{pi}} int_{0}^{1}x^{frac{n-3}{2}} u^{frac{n-3}{2}} x^{frac{1}{2}}(1-u)^{frac{1}{2}}xdu \\ = frac{e^{-frac{x}{2}}x^{frac{n-3}{2}-frac{1}{2}+1}}{2^{frac{n}{2}}Gamma(frac{n-1}{2})Gamma(frac{1}{2})} int_{0}^{1} u^{frac{n-3}{2}}(1-u)^{-frac{1}{2}}du end{eqnarray} 積分の部分は、昔みた覚えがあるけど、もう力尽きたので結論だけ... 以下の関係式があって、 begin{eqnarray} B(p,q) &=& int_{0}^{1} x^{p-1}(1-x)^{q-1}dx \\ &=& frac{Gamma(p)Gamma(q)}{Gamma(p+q)} end{eqnarray} (p,q)を以下のように選ぶと、 begin{eqnarray} B(frac{n-1}{2},frac{1}{2}) = frac{Gamma(frac{n-1}{2})Gamma(frac{1}{2})}{Gamma(frac{n}{2})} end{eqnarray} これを使って式を書き直すと、一気に約分されて自由度(n)のカイ二乗分布の式が現れる。 begin{eqnarray} frac{e^{-frac{x}{2}}x^{frac{n-3}{2}-frac{1}{2}+1}}{2^{frac{n}{2}}Gamma(frac{n-1}{2})Gamma(frac{1}{2})} int_{0}^{1} u^{frac{n-3}{2}}(1-u)^{-frac{1}{2}}du \\ = frac{e^{-frac{x}{2}}x^{frac{n-3}{2}-frac{1}{2}+1}}{2^{frac{n}{2}}Gamma(frac{n-1}{2})Gamma(frac{1}{2})} frac{Gamma(frac{n-1}{2})Gamma(frac{1}{2})}{Gamma(frac{n}{2})} \\ = frac{1}{2^{frac{n}{2}}Gamma({frac{n}{2}})}x^{frac{n}{2}-1}e^{-frac{x}{2}} \\ = f_n(x) end{eqnarray} Q.E.D.!! あぁ、これは気持ち良い。 [arst_adsense slotnumber=\"1\"]
