React+Next.jsでDummy JSONのCRUDをCSR/SSRの両方で作成して違いを調べてみた話
Next.jsでCRUDを作りSSRの挙動を調べてみた。いったんPage Routerを選択。 バックエンドとなるAPIとしてDummyJSONのPosts-Docs APIを使用した。 一覧、詳細、更新、削除が用意される。ただし更新、削除はダミーで永続化されない。 [clink implicit=\"false\" url=\"https://dummyjson.com/docs/posts\" imgurl=\"https://dummyjson.com/public/img/hero-image.svg\" title=\"Free Fake REST API for Placeholder JSON Data\" excerpt=\"Develop, Build, Test.Get instant dummy JSON data for your frontend with DummyJSON Server — no backend setup needed!\"] 目次は以下。 [arst_toc tag=\"h4\"] 構成 CSR版/SSR版の2パターンについてCRUDを行うアプリをClaude(Sonnet4.5)で環境を構築した。 ルーティングについては今回の調査範囲外のため、いったんシンプルなPage Routerを使用した。 npm run dev で next dev --turbopack が動く何かが作られた。turbopackはrust製のwebpackの後継。 いったん実行環境の詳細な把握をスキップして上物の理解を進めることにする。上物の構成は以下。 . ├── app/ │ ├── page.tsx # ホームページ │ └── posts/ │ ├── page.tsx # 投稿一覧選択(CSR/SSR) │ ├── csr/ │ │ ├── page.tsx # 投稿一覧(Client Component版) │ │ ├── [id]/ │ │ │ ├── page.tsx # 投稿詳細(Client Component版) │ │ │ └── edit/ │ │ │ └── page.tsx # 投稿編集(Client Component版) │ │ └── new/ │ │ └── page.tsx # 新規投稿作成(Client Component版) │ ├── ssr/ │ │ ├── page.tsx # 投稿一覧(Server Component版) │ │ ├── [id]/ │ │ │ ├── page.tsx # 投稿詳細(Server Component版) │ │ │ └── edit/ │ │ │ └── page.tsx # 投稿編集(Server Actions版) │ │ └── new/ │ │ └── page.tsx # 新規投稿作成(Server Actions版) │ └── _components/ │ └── DeleteButton.tsx # 削除ボタン(Client Component) ├── lib/ │ └── api.ts # API関数 ├── types/ │ └── post.ts # 型定義 ├── Dockerfile # Dockerイメージ設定 ├── docker-compose.yml # Docker Compose設定 └── next.config.ts # Next.js設定 以下のようなアプリができた。 [video width=\"2280\" height=\"1792\" webm=\"https://ikuty.com/wp-content/uploads/2025/10/recording.webm\"][/video] ReactとNext.jsの関係性と役割分担 ReactはUIを作るためのJavaScriptライブラリとして機能する。コンポーネント、フック、JSXを提供する。 Next.jsはReactを使ったフレームワークであり、ルーティング、ビルド、最適化などの機能を提供する。 React (ライブラリ) Next.js (フレームワーク) 役割の焦点 UI (ユーザーインターフェース) 構築 Webアプリケーション全体の構築 主な提供物 コンポーネント (UI要素)、JSX、Hooks (状態管理など) ルーティング、レンダリング戦略、最適化機能、バックエンド連携 ルーティング 非搭載。別途React Routerなどのライブラリが必要。 ファイルベースルーティングが組み込みで提供される。 レンダリング クライアントサイドレンダリング (CSR) が基本。ブラウザでJavaScriptが実行されてUIを描画する。 プリレンダリング (SSR/SSG) が組み込みで提供される。レンディングのタイミングと場所を制御する。 データ取得 非搭載。fetch APIなどをコンポーネント内で使用する。 データ取得パターン (Server Components, Route Handlersなど) とキャッシュの仕組みが組み込みで提供される。 クライアントサイドレンダリング(CSR) ブラウザ側で動的にHTMLを生成する。useState、useEffect、イベントハンドラが使える。 Step1.初期レンダリング(サーバ側) app/layout.tsx がサーバーで実行される。<html><body>の枠組みを作る。 app/posts/[id]/page.tsxがClinet Componentとして認識され初期HTMLを作る。 ブラウザに初期HTMLを送り、ブラウザは初期HTMLを表示する // app/layout.tsx (20-34行目) export default function RootLayout({ children }: { children: React.ReactNode }) { return ( <html lang=\"en\"> <body> {children} {/* ← ここに子コンポーネントが入る */} </body> </html> ); } ... //ブラウザに送られる初期HTMLの例 <html> <body> <div class=\"min-h-screen p-8\"> <p>読み込み中...</p> ← loading=trueの状態 </div> <script src=\"/_next/...\"></script> ← クライアント用JS </body> </html> Step2.ハイドレーション(Hydration) JavaScriptが読み込まれる Reactがコンポーネントを「水分補給」(Hydrate)、HTMLに機能を追加 初期state: loading = true, post = null Step3.useEffectの実行(副作用) コンポーネントが画面に表示された直後に1回実行される。 // app/posts/[id]/page.tsx (16-18行目) useEffect(() => { loadPost(); // ← コンポーネントがマウントされたら実行 }, [params.id]); Step4.データフェッチとstate更新 api.getPost()を実行。-> fetch(\'https://dummyjson.com/posts/1\') const data でレスポンスを受け取る setPost(data)でstateを更新 setLoading(false)でローディング終了。loading=false // app/posts/[id]/page.tsx (20-32行目) const loadPost = async () => { try { setLoading(true); // ローディング表示 const data = await api.getPost(Number(params.id)); // API呼び出し setPost(data); // ← state更新! setError(null); } catch (err) { setError(\'投稿の読み込みに失敗しました\'); } finally { setLoading(false); // ← state更新! } }; Step5.再レンダリング(stateが変わったので) post stateが更新されたので再レンダリング 条件分岐を再評価 最終的なJSXをDOMに反映 // app/posts/[id]/page.tsx (47-55行目) if (loading) { // loading = false なので通過 return 読み込み中...; } if (error || !post) { // error = null, post = データあり なので通過 return エラー表示; } // ここが実行される! return ( <div className=\"min-h-screen p-8\"> <h1>{post.title}</h1> {/* ← post.title を表示 */} <p>{post.body}</p> {/* ← post.body を表示 */} {/* ... */} </div> ); Step6.リストの動的レンダリング // app/posts/[id]/page.tsx (133-140行目) {post.tags.map((tag, index) => ( // ← 配列をループ <span key={index} className=\"...\"> {tag} {/* ← 各タグを表示 */} </span> ))} 実行結果: post.tags = [\"history\", \"american\", \"crime\"] ↓ map() で変換 <span key={0}>history</span> <span key={1}>american</span> <span key={2}>crime</span> 全体のレンダリングフロー [ユーザーが /posts/1 にアクセス] ↓ ┌──────────────────────────────┐ │ サーバー側(Next.js Server) │ ├──────────────────────────────┤ │ 1. app/layout.tsx を実行 │ │ → を生成 │ │ │ │ 2. app/posts/[id]/page.tsx │ │ を「クライアントコンポーネント」│ │ として認識 │ │ → 初期HTML生成 │ │ (loading=true状態) │ └──────────────────────────────┘ ↓ HTML + JS送信 ┌──────────────────────────────┐ │ ブラウザ側(Client) │ ├──────────────────────────────┤ │ 3. HTMLを表示 │ │ 「読み込み中...」 │ │ │ │ 4. JavaScriptロード │ │ → Hydration実行 │ │ │ │ 5. useEffect発火 │ │ → loadPost()実行 │ │ │ │ 6. API呼び出し │ │ fetch(https://dummyjson.com/posts/1) │ ↓ │ │ レスポンス受信 │ │ │ │ 7. setState実行 │ │ setPost(data) │ │ setLoading(false) │ │ ↓ │ │ 8. 再レンダリング │ │ → 投稿詳細を表示 │ └──────────────────────────────┘ サーバーサイドレンダリング(SSR) サーバ側でHTMLが生成される。DBやAPIに直接アクセスできる。useState,useEffectを使わない。 例えば、当アプリにおいて / へのアクセスに対してNext.jsが app/page.tsx を実行する。 HTMLを生成してブラウザに送信し、ブラウザはHTMLを表示する。 1周回って戻ってきたというか、LaravelやRailsにフロントエンドから寄っていくスタイル。 バックエンドにAPIを用意せずDBを直接操作できるため、SPAが不要な簡易的な管理画面など、 大幅な工数削減が可能になると思う。 Laravel,Railsだと、フロントエンドの記述にVue/Reactを導入する必要があるため、 バックエンド・フロントエンド、という棲み分けが発生してしまうが、 Next.jsのSSR(+CSR混在)により、フロントエンドとバックエンドを同じ仕組みで実現できる点で 管理する対象が大幅に減るのもメリットだと思う。 import Link from \'next/link\'; import { api } from \'@/lib/api\'; import DeleteButton from \'@/app/posts/_components/DeleteButton\'; // Server Component(デフォルト) // \'use client\' ディレクティブがないため、サーバー側で実行される export default async function PostsPageSSR() { // サーバー側で直接データ取得 // useEffect や useState は不要 const data = await api.getPosts(); const posts = data.posts; return ( <div className=\"min-h-screen p-8 bg-gray-50\"> <div className=\"max-w-4xl mx-auto\"> {/* ヘッダー部分 */} <div className=\"bg-blue-50 border border-blue-200 rounded-lg p-4 mb-6\"> <p className=\"text-sm text-blue-800\"> <strong>Server Component版 - このページはサーバー側でレンダリングされ、HTMLに既にデータが含まれています </p> </div> ... ); } SSRとCSRの統合 SSRモードとCSRモードの2つのモードが存在する訳ではなく、SSRとCSRは同時に存在し得る。 例えば、今回作成したSSR版アプリの投稿一覧画面において、CSRで削除ボタンを実現している。 コンポーネント単位でSSR/CSRの分離が起こるだけで、アーキ全体ではSSRとCSRは混在できる。 TypeScriptにより型安全にpropsを渡せるし、状態管理がReactの仕組みで統一できる。 部分的な更新は可能 (router.refresh() )。 // app/posts/ssr/page.tsx (Server Component) export default async function PostsPageSSR() { const data = await api.getPosts(); // サーバー側で実行 const posts = data.posts; return ( <div> {posts.map(post => ( <div key={post.id}> <h2>{post.title}</h2> {/* Client Componentをそのまま埋め込める */} <DeleteButton postId={post.id} /> </div> ))} </div> ); } // app/posts/_components/DeleteButton.tsx (Client Component) \'use client\'; export default function DeleteButton({ postId }: { postId: number }) { const router = useRouter(); const handleDelete = async () => { if (!confirm(\'削除しますか?\')) return; await api.deletePost(postId); router.refresh(); // この部分だけ更新 }; return ( <button onClick={handleDelete}>削除</button> ); } まとめ Next.jsのHello WorldをしつつSSRとCSRの挙動を確認した。 フロント側フレームワークの枠組みを越え、フロント・バックエンドを統一的に扱えることを確認した。 アプリケーションの要件次第で、SSRを中心に部分的にCSRとすることで大幅な工数削減を期待できそう。
go-txdbを使ってgolang, gin, gorm(gen)+sqlite構成のAPI をテストケース毎に管理する
データベース依存のテストケースを作る際に、テストケース毎にDBがクリーンな状態を維持したい。 go-txdbはDBへの接続時にトランザクションを開始、切断時にトランザクションを終了するSQLドライバ。 テスト実行中にトランザクション内で発行したステートメント・行はテスト終了時には消滅する。 DB毎に実装方法は異なり、例えばSQLiteでは\"トランザクション\"ではなくsaveponitで実装される。 [clink implicit=\"false\" url=\"https://github.com/DATA-DOG/go-txdb\" imgurl=\"https://avatars.githubusercontent.com/u/6613360?s=48&v=4\" title=\"Single transaction based sql.Driver for GO\" excerpt=\"Package txdb is a single transaction based database sql driver. When the connection is opened, it starts a transaction and all operations performed on this sql.DB will be within that transaction. If concurrent actions are performed, the lock is acquired and connection is always released the statements and rows are not holding the connection.\"] [arst_toc tag=\"h4\"] 環境構築 Claude Code (Sonnet4.5) で以下の環境を構成した。途中15回のエラーリカバリを挟んで 期待通りの環境が出来上がった。 main.goがアプリケーションのルーティング(ハンドラ共有)、 main_test.goが main.goのルートに対するテスト。テストにはTestMain()が含まれている。 test_repository_test.goはGinが生成したリポジトリ層(モデル)をルートを経由せずテストする。 $ tree . -L 2 . ├── data │ └── db.sqlite # SQLite DBファイル ├── docker-compose.yml # Go+sqlite ├── Dockerfile # golang:1.23-alpineベース ├── gen.go # GORM Genコード生成スクリプト ├── go.mod # 依存関係の定義(go getやgo mod tidyで更新) ├── go.sum # 依存関係の検証用ハッシュ(自動) ├── init.sql # DDL,初期レコード ├── main.go # Gin初期化,ルーティング ├── main_test.go # main.goのルーティングに対するテストコード ├── models # モデル │ ├── model # testsテーブルと対応する構造体定義 (自動生成) │ └── query # (自動生成) ├── repository │ └── test_repository_test.go # リポジトリ層(データアクセス層)のテスト └── testhelper └── db.go # TxDB初期化等テスト用ヘルパー サンプルデータの準備 testsというテーブルに id, value というカラムを用意し、hoge, fuga レコードを挿入しておく。 簡略化のためにSQLiteを使用しており、ホスト側のファイルをbindマウントし初期実行判定して投入した。 -- Create tests table CREATE TABLE IF NOT EXISTS tests ( id INTEGER PRIMARY KEY, value TEXT NOT NULL ); -- Insert initial data INSERT OR IGNORE INTO tests (id, value) VALUES (1, \'hoge\'); INSERT OR IGNORE INTO tests (id, value) VALUES (2, \'fuga\'); CRUD ルーティング gin, gorm(gen) を使用して testsテーブルに対するCRUDを行う以下のルートを定義した。 それぞれ、genを使用しGolang言語のレベルでオブジェクトを操作している。 | メソッド | エンドポイント| 説明 | 仕様 | |--------|------------|----------------|-------------------------------------------------| | GET | /hello | 全レコード取得 | Find()で全レコードを取得し返却 | | GET | /hello/:id | 指定IDのレコード取得 | URLパラメータからIDを取得し、該当レコードを返却 | | POST | /hello | 新規レコード追加 | JSONリクエストボディからidとvalueを受け取り新規作成 | | PATCH | /hello/:id | 指定IDのレコード更新 | URLパラメータのIDとJSONボディのvalueでレコード更新 | | DELETE | /hello/:id | 指定IDのレコード削除 | URLパラメータのIDでレコード削除. | 各ハンドラの詳細な実装は冗長なので割愛。 手動リクエストと応答 各エンドポイント に対するリクエストとレスポンスの関係は以下。期待通り。 # 全件取得し応答 $ curl http://localhost:8080/hello [{\"id\":1,\"value\":\"hoge\"},{\"id\":2,\"value\":\"fuga\"}] # id=1を取得し応答 $ curl http://localhost:8080/hello/1 {\"id\":1,\"value\":\"hoge\"} # id=3を追加 $ curl -X POST http://localhost:8080/hello -H \"Content-Type: application/json\" -d \'{\"id\":3,\"value\":\"piyo\"}\' {\"id\":3,\"value\":\"piyo\"} $ curl http://localhost:8080/hello [{\"id\":1,\"value\":\"hoge\"},{\"id\":2,\"value\":\"fuga\"},{\"id\":3,\"value\":\"piyo\"}] # id=3を変更 $ curl -X PATCH http://localhost:8080/hello/3 -H \"Content-Type: application/json\" -d \'{\"value\":\"updated_piyo\"}\' {\"id\":3,\"value\":\"updated_piyo\"} # id=3を削除 $ curl -X DELETE http://localhost:8080/hello/3 {\"message\":\"record deleted successfully\"} # 全件取得し応答 $ curl http://localhost:8080/hello [{\"id\":1,\"value\":\"hoge\"},{\"id\":2,\"value\":\"fuga\"}] txdbを使用するためのテスト用ヘルパー関数 txdbを使用するためのテスト用ヘルパー関数を以下のように定義しておく。 package testhelper import ( \"database/sql\" \"fmt\" \"os\" \"sync\" \"sync/atomic\" \"github.com/DATA-DOG/go-txdb\" _ \"github.com/mattn/go-sqlite3\" \"gorm.io/driver/sqlite\" \"gorm.io/gorm\" ) var ( once sync.Once connID atomic.Uint64 ) // SetupTxDB initializes txdb driver for testing func SetupTxDB() { once.Do(func() { // Get database path dbPath := os.Getenv(\"DB_PATH\") if dbPath == \"\" { dbPath = \"./data/db.sqlite\" } // Register txdb driver with SQLite-specific options // Use WAL mode and configure for better concurrent access dsn := fmt.Sprintf(\"%s?_journal_mode=WAL&_busy_timeout=5000\", dbPath) txdb.Register(\"txdb\", \"sqlite3\", dsn) }) } // NewTestDB creates a new test database connection with txdb // Each connection will be isolated in a transaction and rolled back after test func NewTestDB() (*gorm.DB, error) { SetupTxDB() // Open connection using txdb driver with unique connection ID // This ensures each test gets its own isolated transaction id := connID.Add(1) sqlDB, err := sql.Open(\"txdb\", fmt.Sprintf(\"connection_%d\", id)) if err != nil { return nil, fmt.Errorf(\"failed to open txdb connection: %w\", err) } // Wrap with GORM db, err := gorm.Open(sqlite.Dialector{ Conn: sqlDB, }, &gorm.Config{}) if err != nil { return nil, fmt.Errorf(\"failed to open gorm connection: %w\", err) } return db, nil } テストの命名規則と共通処理 テストの関数名はTestXXX()のようにTestから始まりキャメルケースを続ける。 TestMain()内に全ての処理の前に実行する処理、後に実行する処理を記述できる。 package main import ( \"bytes\" \"encoding/json\" \"net/http\" \"net/http/httptest\" \"os\" \"testing\" \"gin_txdb/testhelper\" \"github.com/gin-gonic/gin\" \"github.com/stretchr/testify/assert\" \"github.com/stretchr/testify/require\" ) func TestMain(m *testing.M) { // Set DB_PATH for testing os.Setenv(\"DB_PATH\", \"./data/db.sqlite\") // Set Gin to test mode gin.SetMode(gin.TestMode) // Run tests code := m.Run() os.Exit(code) } 全件取得のテスト ヘルパー関数のNewTestDB()を使用することでtxdbを使用してDBに接続している。 defer func()内でコネクションを明示的にクローズする処理を遅延評価(=テスト完了時評価)しているが、 テスト実行中にエラーやpanicが起きた場合に開いたDBを切ることができなくなる問題への対処。 特にSQLiteの場合「接続は常に1つ」なので、切り忘れで接続が開きっぱなしになると、 次のテスト実行でロックエラーが発生する。明示的に閉じることでこの問題を確実に回避できる。 後はアサートを書いていく。 func TestGetAllTests(t *testing.T) { // Setup test database with txdb db, err := testhelper.NewTestDB() require.NoError(t, err) defer func() { sqlDB, _ := db.DB() sqlDB.Close() }() // Setup router using main.go\'s SetupRouter router := SetupRouter(db) // Create request req, _ := http.NewRequest(http.MethodGet, \"/hello\", nil) w := httptest.NewRecorder() // Perform request router.ServeHTTP(w, req) // Assert response assert.Equal(t, http.StatusOK, w.Code) var response []map[string]interface{} err = json.Unmarshal(w.Body.Bytes(), &response) require.NoError(t, err) // Should have 2 initial records assert.Len(t, response, 2) assert.Equal(t, float64(1), response[0][\"id\"]) assert.Equal(t, \"hoge\", response[0][\"value\"]) assert.Equal(t, float64(2), response[1][\"id\"]) assert.Equal(t, \"fuga\", response[1][\"value\"]) } このテストだけ実行してみる。-run オプションでテスト名を指定する。 $ go test -run TestGetAllTests [GIN] 2025/10/15 - 17:17:44 | 200 | 238.666µs | | GET \"/hello\" PASS ok gin_txdb 0.496s 1件取得のテスト(正常系) [GET] /hello/:id のテスト。指定したIDが存在する正常系。 func TestGetTestByID_Success(t *testing.T) { // Setup test database with txdb db, err := testhelper.NewTestDB() require.NoError(t, err) defer func() { sqlDB, _ := db.DB() sqlDB.Close() }() // Setup router router := SetupRouter(db) // Create request req, _ := http.NewRequest(http.MethodGet, \"/hello/1\", nil) w := httptest.NewRecorder() // Perform request router.ServeHTTP(w, req) // Assert response assert.Equal(t, http.StatusOK, w.Code) var response map[string]interface{} err = json.Unmarshal(w.Body.Bytes(), &response) require.NoError(t, err) assert.Equal(t, float64(1), response[\"id\"]) assert.Equal(t, \"hoge\", response[\"value\"]) } 実行結果は以下の通り。 go test -run TestGetTestByID_Success [GIN] 2025/10/15 - 17:24:41 | 200 | 207.25µs | | GET \"/hello/1\" PASS ok gin_txdb 0.330s 1件取得のテスト(異常系) [GET] /hello/:idのテスト。指定したIDが見つからない異常系。 func TestGetTestByID_NotFound(t *testing.T) { // Setup test database with txdb db, err := testhelper.NewTestDB() require.NoError(t, err) defer func() { sqlDB, _ := db.DB() sqlDB.Close() }() // Setup router router := SetupRouter(db) // Create request for non-existent ID req, _ := http.NewRequest(http.MethodGet, \"/hello/999\", nil) w := httptest.NewRecorder() // Perform request router.ServeHTTP(w, req) // Assert response assert.Equal(t, http.StatusNotFound, w.Code) var response map[string]interface{} err = json.Unmarshal(w.Body.Bytes(), &response) require.NoError(t, err) assert.Equal(t, \"record not found\", response[\"error\"]) } 実行結果は以下の通り。 go test -run TestGetTestByID_NotFound ./gin_txdb/main.go:52 record not found [0.105ms] [rows:0] SELECT * FROM `tests` WHERE `tests`.`id` = 999 ORDER BY `tests`.`id` LIMIT 1 [GIN] 2025/10/15 - 17:22:45 | 404 | 542.875µs | | GET \"/hello/999\" PASS ok gin_txdb 0.672s 1件追加のテスト(正常系) [POST] /helloが正常終了した場合、追加したレコードをレスポンスで返す処理のため、 レスポンスで返ってきたデータをアサートしている。 その後、[GET] /hello/:id のレスポンスを使ってアサートしている。 func TestCreateTest_Success(t *testing.T) { // Setup test database with txdb db, err := testhelper.NewTestDB() require.NoError(t, err) defer func() { sqlDB, _ := db.DB() sqlDB.Close() }() // Setup router router := SetupRouter(db) // Create request body payload := map[string]interface{}{ \"id\": 100, \"value\": \"test_value\", } body, _ := json.Marshal(payload) // Create request req, _ := http.NewRequest(http.MethodPost, \"/hello\", bytes.NewBuffer(body)) req.Header.Set(\"Content-Type\", \"application/json\") w := httptest.NewRecorder() // Perform request router.ServeHTTP(w, req) // Assert response assert.Equal(t, http.StatusCreated, w.Code) var response map[string]interface{} err = json.Unmarshal(w.Body.Bytes(), &response) require.NoError(t, err) assert.Equal(t, float64(100), response[\"id\"]) assert.Equal(t, \"test_value\", response[\"value\"]) // Verify the record was actually created req2, _ := http.NewRequest(http.MethodGet, \"/hello/100\", nil) w2 := httptest.NewRecorder() router.ServeHTTP(w2, req2) assert.Equal(t, http.StatusOK, w2.Code) } 実行結果は以下の通り。 $ go test -run TestCreateTest_Success [GIN] 2025/10/15 - 17:30:04 | 201 | 398.167µs | | POST \"/hello\" [GIN] 2025/10/15 - 17:30:04 | 200 | 47.625µs | | GET \"/hello/100\" PASS ok gin_txdb 0.505s 1件追加のテスト(異常なパラメタ。異常系) testsレコードはid,valueカラムを持つ。idのみ(valueなし)を渡した場合400エラーを返す。 func TestCreateTest_MissingFields(t *testing.T) { // Setup test database with txdb db, err := testhelper.NewTestDB() require.NoError(t, err) defer func() { sqlDB, _ := db.DB() sqlDB.Close() }() // Setup router router := SetupRouter(db) // Create request body with missing value field payload := map[string]interface{}{ \"id\": 100, } body, _ := json.Marshal(payload) // Create request req, _ := http.NewRequest(http.MethodPost, \"/hello\", bytes.NewBuffer(body)) req.Header.Set(\"Content-Type\", \"application/json\") w := httptest.NewRecorder() // Perform request router.ServeHTTP(w, req) // Assert response assert.Equal(t, http.StatusBadRequest, w.Code) } 実行結果は以下の通り。 期待通り400エラーが返ったことをアサートできた。 go test -run TestCreateTest_MissingFields [GIN] 2025/10/15 - 17:36:49 | 400 | 139.709µs | | POST \"/hello\" PASS ok gin_txdb 0.501s txdbが正しくトランザクションを分離していることのテスト Claude Code (Sonnet4.5) が (指示していないのに) 自動的にこのテストを作成してくれた。 お勉強を兼ねたテストプロジェクトであることを伝えたために、気を利かせてくれた感がある。 以下をテストする。 トランザクション内での一貫性 (トランザクション内で作成したデータを同一トランザクション内で観察できる) トランザクション間の分離 (別のトランザクションで作成したデータを観察できない。テストは独立している) 自動ロールバックの動作確認 (txdbがClose()時に自動的にロールバックを実行している。DBは初期状態に戻る) あくまで、一貫性、分離、ロールバックの一例を見せてもらうだけなのだが、 こういうことをやりたいのだな、という背景を理解できたのでお勉強としては十分。 func TestTransactionIsolation(t *testing.T) { // This test demonstrates that each test gets isolated transactions t.Run(\"Test1_CreateRecord\", func(t *testing.T) { db, err := testhelper.NewTestDB() require.NoError(t, err) defer func() { sqlDB, _ := db.DB() sqlDB.Close() }() router := SetupRouter(db) // Create a new record payload := map[string]interface{}{ \"id\": 200, \"value\": \"test200\", } body, _ := json.Marshal(payload) req, _ := http.NewRequest(http.MethodPost, \"/hello\", bytes.NewBuffer(body)) req.Header.Set(\"Content-Type\", \"application/json\") w := httptest.NewRecorder() router.ServeHTTP(w, req) assert.Equal(t, http.StatusCreated, w.Code) // Verify it exists in this transaction req2, _ := http.NewRequest(http.MethodGet, \"/hello/200\", nil) w2 := httptest.NewRecorder() router.ServeHTTP(w2, req2) assert.Equal(t, http.StatusOK, w2.Code) }) t.Run(\"Test2_VerifyRollback\", func(t *testing.T) { db, err := testhelper.NewTestDB() require.NoError(t, err) defer func() { sqlDB, _ := db.DB() sqlDB.Close() }() router := SetupRouter(db) // The record created in Test1 should not exist (rolled back) req, _ := http.NewRequest(http.MethodGet, \"/hello/200\", nil) w := httptest.NewRecorder() router.ServeHTTP(w, req) assert.Equal(t, http.StatusNotFound, w.Code) // Should still have only 2 original records req2, _ := http.NewRequest(http.MethodGet, \"/hello\", nil) w2 := httptest.NewRecorder() router.ServeHTTP(w2, req2) var response []map[string]interface{} json.Unmarshal(w2.Body.Bytes(), &response) assert.Len(t, response, 2) }) } まとめ go-txdbを使うことで、テストケース毎にDBを分離できることを確認した。 あればかなり便利だと思う。
gorm互換の型安全なORMであるgenでCRUD APIを試作
GolangとGinでAPIを書くために必要な要素技術を学習中、 ORMは何が良いか考えてみたが、gormは無いなぁと思うに至った。 ORマッパーがどの程度の抽象化を担うべきか、については答えはないと思うが、 Webアプリケーションのシナリオで出てくるテーブル構造と関係程度は完全にSQLを排除して欲しい。 SQLを排除することで可読性が向上するし、静的型付けによる恩恵を得られる。 Genには以下のような特徴がある。 型安全: コンパイル時にエラー検出 自動補完: IDEでメソッドとフィールドが補完される クエリビルダー: Where(q.Product.Name.Like(\"%...\"))のような直感的なAPI GORM互換: 既存のGORMモデルをそのまま使用可能 なぜ\"Gen\"なのかは、ビルド時にGolangコードから静的に(ビルド前に)オブジェクトにアクセスする ために必要なGoオブジェクトを生成する、という仕組みから来ているのではないかと思う。 [clink implicit=\"false\" url=\"https://gorm.io/gen/index.html\" imgurl=\"https://gorm.io/gorm.svg\" title=\"Gen Guides\" excerpt=\"GEN: Friendly & Safer GORM powered by Code Generation.Idiomatic & Reusable API from Dynamic Raw SQL.100% Type-safe DAO API without interface{}.Database To Struct follows GORM conventions.GORM under the hood, supports all features, plugins, DBMS that GORM supports.\"] [arst_toc tag=\"h4\"] 環境構築 サクッとClaudeで環境を作った。実際に商用環境を作るとしたら必要な理解の度合いは上がるだろうが、 試してみるまでの時間が無駄にかかって勿体無いのと、Claudeに入口を教わるのは悪くない。 以下の構成で、Golang+GinにCRUDルートを設定しgenを介してDBアクセスできる。 models以下にテーブルと対応する型定義された構造体が格納される。 また、query以下にGormレベルの操作をGen(Golang)レベルに抽象化する自動生成コードが格納される。 query以下を読むと、GenがGormのラッパーであることが良くわかる。 $ tree . -n 2 . ├── cmd │ └── generate │ └── main.go # マイグレーション ├── config │ └── database.go # DB接続設定 ├── database │ └── database.go # Conenct(), Close(), GetDB()など ├── docker-compose.yml # Golangアプリケーション(8080), PostgreSQL(5432) ├── Dockerfile ├── go.mod ├── go.sum ├── handlers │ └── product.go ├── main.go # CRUD APIのルーティング ├── models │ └── product.go # テーブル->モデル ├── query │ ├── gen.go # モデルを操作するラッパー │ └── products.gen.go # SQLレベルのモデル操作をGolangレベルに抽象化するためのIF └── README.md CRUDルート 早速、CRUD APIのルートを作っていく。Claudeにお任せしたところ商品(Product)のCRUD APIが出来た。 その位置にMigrate置くの本当に良いの? という感があるが、本題はそこではないので省略。 package main import ( \"log\" \"github.com/gin-gonic/gin\" \"github.com/gin-gonic/gin/binding\" \"github.com/ikuty/golang-gin/database\" \"github.com/ikuty/golang-gin/handlers\" \"github.com/ikuty/golang-gin/models\" \"github.com/ikuty/golang-gin/query\" ) func main() { // データベース接続 if err := database.Connect(); err != nil { log.Fatalf(\"Failed to connect to database: %v\", err) } defer database.Close() // マイグレーション実行 db := database.GetDB() if err := db.AutoMigrate(&models.Product{}); err != nil { log.Fatalf(\"Failed to migrate database: %v\", err) } // Gen初期化 query.SetDefault(db) // Ginエンジンの初期化 r := gin.Default() // 8. GORM + PostgreSQL - CRUD操作 r.GET(\"/api/products\", handlers.GetProductsHandler) // 全商品取得 r.GET(\"/api/products/:id\", handlers.GetProductHandler) // 商品詳細取得 r.POST(\"/api/products\", handlers.CreateProductHandler) // 商品作成 r.PUT(\"/api/products/:id\", handlers.UpdateProductHandler) // 商品更新 r.DELETE(\"/api/products/:id\", handlers.DeleteProductHandler) // 商品削除 r.GET(\"/api/products/search\", handlers.SearchProductsHandler) // 商品検索 // サーバー起動 r.Run(\":8080\") } モデル さて、モデル定義(=テーブル構造)はどうなっているかというと、以下の通り。 フィールドの物理型をGenを介してGolangで厳密で管理できるのは動的型付け言語にはない利点。 package models import ( \"time\" \"gorm.io/gorm\" ) // Product は商品モデル type Product struct { ID uint `gorm:\"primarykey\" json:\"id\"` Name string `gorm:\"size:100;not null\" json:\"name\" binding:\"required\"` Description string `gorm:\"size:500\" json:\"description\"` Price float64 `gorm:\"not null\" json:\"price\" binding:\"required,gt=0\"` Stock int `gorm:\"default:0\" json:\"stock\"` Category string `gorm:\"size:50\" json:\"category\"` CreatedAt time.Time `json:\"created_at\"` UpdatedAt time.Time `json:\"updated_at\"` DeletedAt gorm.DeletedAt `gorm:\"index\" json:\"-\"` } // TableName はテーブル名を指定 func (Product) TableName() string { return \"products\" } ハンドラ(商品詳細取得) 素晴らしい。説明が不要なくらいDBアクセスが抽象化されている。 ただ、依存性注入があるEloquentと比べるとロジックと関係ない冗長な処理が残っている。 db,q,Contextは裏側に隠して欲しいという思いはあるものの、これでも良いかとも思う。 Find()はGenにより自動生成される。interfaceが用意されビルド時に全て解決される。 なお、VSCodeなどで補完が効く、というのは、例えば JetBrains環境であれば、 動的型付け言語であってもほぼ実現されているので、それほど実利があるメリットではない。 package handlers import ( \"net/http\" \"strconv\" \"github.com/gin-gonic/gin\" \"github.com/ikuty/golang-gin/database\" \"github.com/ikuty/golang-gin/models\" \"github.com/ikuty/golang-gin/query\" ) // GetProductsHandler は全商品を取得 func GetProductsHandler(c *gin.Context) { db := database.GetDB() q := query.Use(db) products, err := q.Product.WithContext(c.Request.Context()).Find() if err != nil { c.JSON(http.StatusInternalServerError, gin.H{ \"error\": \"Failed to fetch products\", }) return } c.JSON(http.StatusOK, gin.H{ \"data\": products, \"count\": len(products), }) } ハンドラ(指定の商品を取得) バリデータを介さず自力でバリデーション(IDがUintか)を行っている。 Productに対してWhereで条件指定し(Order By Ascした後に)先頭のオブジェクトを取得している。 もはや他に説明が必要ないくらい抽象化されていて良い。 // GetProductHandler は指定IDの商品を取得 func GetProductHandler(c *gin.Context) { id := c.Param(\"id\") idUint, err := strconv.ParseUint(id, 10, 32) if err != nil { c.JSON(http.StatusBadRequest, gin.H{ \"error\": \"Invalid ID\", }) return } db := database.GetDB() q := query.Use(db) product, err := q.Product.WithContext(c.Request.Context()).Where(q.Product.ID.Eq(uint(idUint))).First() if err != nil { c.JSON(http.StatusNotFound, gin.H{ \"error\": \"Product not found\", }) return } c.JSON(http.StatusOK, gin.H{ \"data\": product, }) } ハンドラ(商品作成) 次はCreate。モデルオブジェクトを空から生成し入力値をバインドして整形した後に、 Create()に渡している。Create()の内部はGormレベルの(低レイヤの)コードが動く。 // CreateProductHandler は商品を作成 func CreateProductHandler(c *gin.Context) { var product models.Product if err := c.ShouldBindJSON(&product); err != nil { c.JSON(http.StatusBadRequest, gin.H{ \"error\": \"Invalid request\", \"details\": err.Error(), }) return } db := database.GetDB() q := query.Use(db) if err := q.Product.WithContext(c.Request.Context()).Create(&product); err != nil { c.JSON(http.StatusInternalServerError, gin.H{ \"error\": \"Failed to create product\", }) return } c.JSON(http.StatusCreated, gin.H{ \"message\": \"Product created successfully\", \"data\": product, }) } ハンドラ(商品更新) 基本的にはCreate()と同じ。空モデルに入力値をバインドしUpdate()に渡している。 実行後に更新対象のオブジェクトを取得しているがEloquentは確か更新の戻りがオブジェクトだった。 // UpdateProductHandler は商品を更新 func UpdateProductHandler(c *gin.Context) { id := c.Param(\"id\") idUint, err := strconv.ParseUint(id, 10, 32) if err != nil { c.JSON(http.StatusBadRequest, gin.H{ \"error\": \"Invalid ID\", }) return } db := database.GetDB() q := query.Use(db) ctx := c.Request.Context() // 既存の商品を取得 product, err := q.Product.WithContext(ctx).Where(q.Product.ID.Eq(uint(idUint))).First() if err != nil { c.JSON(http.StatusNotFound, gin.H{ \"error\": \"Product not found\", }) return } // 更新データをバインド var updateData models.Product if err := c.ShouldBindJSON(&updateData); err != nil { c.JSON(http.StatusBadRequest, gin.H{ \"error\": \"Invalid request\", \"details\": err.Error(), }) return } // 更新実行 _, err = q.Product.WithContext(ctx).Where(q.Product.ID.Eq(uint(idUint))).Updates(&updateData) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{ \"error\": \"Failed to update product\", }) return } // 更新後のデータを取得 product, _ = q.Product.WithContext(ctx).Where(q.Product.ID.Eq(uint(idUint))).First() c.JSON(http.StatusOK, gin.H{ \"message\": \"Product updated successfully\", \"data\": product, }) } ハンドラ(論理削除) DeletedAtフィールドがNULLの場合、そのレコードはアクティブ。非Nullなら論理削除済み。 Unscoped()を介さずDelete()した場合(つまりデフォルトでは)論理削除となる。 DeletedAtは他のAPIから透過的に扱われる。論理削除状態かどうかは把握しなくて良い。 DeletedAtはデフォルトでは*time.Time型だが、のデータ形式の対応も可能。 // DeleteProductHandler は商品を削除(ソフトデリート) func DeleteProductHandler(c *gin.Context) { id := c.Param(\"id\") idUint, err := strconv.ParseUint(id, 10, 32) if err != nil { c.JSON(http.StatusBadRequest, gin.H{ \"error\": \"Invalid ID\", }) return } db := database.GetDB() q := query.Use(db) // ソフトデリート実行 _, err = q.Product.WithContext(c.Request.Context()).Where(q.Product.ID.Eq(uint(idUint))).Delete() if err != nil { c.JSON(http.StatusInternalServerError, gin.H{ \"error\": \"Failed to delete product\", }) return } c.JSON(http.StatusOK, gin.H{ \"message\": \"Product deleted successfully\", }) } ハンドラ(商品検索) Where句を複数記述する場合など、手続き的に条件用のオブジェクトを足していける。 一見、productQueryを上から上書きしているように見えるが、Genのクエリビルダーはimmutableパターン として振る舞い、都度実行によりWhereの戻りとなるオブジェクトが累積していく動作となる。 // SearchProductsHandler は商品を検索 func SearchProductsHandler(c *gin.Context) { db := database.GetDB() q := query.Use(db) ctx := c.Request.Context() // クエリパラメータを取得 name := c.Query(\"name\") category := c.Query(\"category\") minPrice := c.Query(\"min_price\") maxPrice := c.Query(\"max_price\") // クエリビルダー productQuery := q.Product.WithContext(ctx) if name != \"\" { productQuery = productQuery.Where(q.Product.Name.Like(\"%\" + name + \"%\")) } if category != \"\" { productQuery = productQuery.Where(q.Product.Category.Eq(category)) } if minPrice != \"\" { if price, err := strconv.ParseFloat(minPrice, 64); err == nil { productQuery = productQuery.Where(q.Product.Price.Gte(price)) } } if maxPrice != \"\" { if price, err := strconv.ParseFloat(maxPrice, 64); err == nil { productQuery = productQuery.Where(q.Product.Price.Lte(price)) } } // 検索実行 products, err := productQuery.Find() if err != nil { c.JSON(http.StatusInternalServerError, gin.H{ \"error\": \"Failed to search products\", }) return } c.JSON(http.StatusOK, gin.H{ \"data\": products, \"count\": len(products), }) } 変換後のクエリを見てみる。 $ http://localhost:8080/api/products/search?name=Test&category=Electronics&min_price=1400&max_price=1600 SELECT * FROM \"products\" WHERE \"products\".\"name\" LIKE \'%Test%\' AND \"products\".\"category\" = \'Electronics\' AND \"products\".\"price\" >= 1400 AND \"products\".\"price\" <= 1600 AND "products"."deleted_at" IS NULL まとめ GolangのORMであるGormをラップしたGenを使って、CRUDを行うAPIをGinで書いて動かしてみた。 確かにGormレベル(SQLレベル)の記述が不要であることを確認した。 (まだ見ていないが)テーブルをJOINしていった先にGormを素で触らないといけない場面は発生するだろうが、 多くのシナリオでGenだけで行けるのであれば、Genを導入するメリットとなるのではないだろうか。
Golang + Gin カスタムバリデーション
Golang+GinによるAPI構築で使いそうなフィーチャーを試してみるシリーズ。 今回はカスタムバリデーションを試してみる。 [clink implicit=\"false\" url=\"https://gin-gonic.com/ja/docs/examples/custom-validators/\" imgurl=\"https://gin-gonic.com/_astro/gin.D6H2T_2v_ZD2G7l.webp\" title=\"カスタムバリデーション\" excerpt=\"カスタムしたバリデーションを使用することもできます。サンプルコード も見てみてください。\"] [arst_toc tag=\"h4\"] ルーティング バリデーションを外部に移譲することで、ハンドラからロジック以外の冗長な処理を除くことができる。 Ginはカスタムバリデータを用意している。以下の例では、ユーザ登録を行うPOSTリクエストの例。 組み込みのバリデーション・バインディングと合わせて、パスワードバリデーションロジックの 追加を行っている。 package main import ( \"github.com/gin-gonic/gin\" \"github.com/gin-gonic/gin/binding\" \"github.com/go-playground/validator/v10\" \"github.com/ikuty/golang-gin/handlers\" ) func main() { // Ginエンジンの初期化 r := gin.Default() // カスタムバリデーターを登録 if v, ok := binding.Validator.Engine().(*validator.Validate); ok { handlers.InitCustomValidators(v) } // 7. カスタムバリデーション r.POST(\"/api/register\", handlers.RegisterValidatorHandler) // サーバー起動 r.Run(\":8080\") } ハンドラ リクエストで受けたJSONをRegisterRequest構造体にバインディングする際に、組み込みの バリデーションルールを定義するのとは別に、strongpassword というカスタムルールを定義している。 strongpasswordルールの実体は strongPassword() 。 例に出現するオブジェクトの使い方は、まぁこう使うのかぐらいで、ありがちな感じ。 カスタムバリデータ関数がチェック結果をTrue/Falseで返せばよさそう。 組み込みバリデータ、または、カスタムバリデータのバリデーション結果と文字列の対応を定義し、 その文字列をレスポンスに付与して返す、というのは良くあるパターンで、 Ginで実装する場合は、 また、カスタムバリデータのバリデーション結果と文字列の対応を定義しレスポンスに含める、 というパターンは良くありそうで、構造体へのバインディングで発生したエラー(err)を取得し、 errに対する型アサーションを行った上で、errを validator.ValidationErrors型として扱う。 動的型付けだと、発生したerrが本当に期待したオブジェクトなのか実行するまで分からなが、 全ての処理が静的型付けを通して、実行前に実行可能であることが確認される。 package handlers import ( \"net/http\" \"regexp\" \"github.com/gin-gonic/gin\" \"github.com/go-playground/validator/v10\" ) // RegisterRequest はユーザー登録リクエストの構造体(高度なバリデーション付き) type RegisterRequest struct { Username string `json:\"username\" binding:\"required,min=3,max=20,alphanum\"` Email string `json:\"email\" binding:\"required,email\"` Password string `json:\"password\" binding:\"required,min=8,max=50,strongpassword\"` Age int `json:\"age\" binding:\"required,gte=18,lte=100\"` Website string `json:\"website\" binding:\"omitempty,url\"` Phone string `json:\"phone\" binding:\"omitempty,e164\"` // E.164 形式の電話番号 } // カスタムバリデーター: 強力なパスワードチェック func strongPassword(fl validator.FieldLevel) bool { password := fl.Field().String() // 最低1つの大文字、1つの小文字、1つの数字を含む hasUpper := regexp.MustCompile(`[A-Z]`).MatchString(password) hasLower := regexp.MustCompile(`[a-z]`).MatchString(password) hasNumber := regexp.MustCompile(`[0-9]`).MatchString(password) return hasUpper && hasLower && hasNumber } // RegisterValidatorHandler はカスタムバリデーターを使用するハンドラー func RegisterValidatorHandler(c *gin.Context) { var req RegisterRequest // JSON をバインド if err := c.ShouldBindJSON(&req); err != nil { // バリデーションエラーを詳細に返す c.JSON(http.StatusBadRequest, gin.H{ \"error\": \"Validation failed\", \"details\": formatValidationError(err), }) return } c.JSON(http.StatusCreated, gin.H{ \"message\": \"Registration successful\", \"username\": req.Username, \"email\": req.Email, }) } // formatValidationError はバリデーションエラーをわかりやすく整形 func formatValidationError(err error) []string { var errors []string if validationErrors, ok := err.(validator.ValidationErrors); ok { for _, e := range validationErrors { var message string switch e.Tag() { case \"required\": message = e.Field() + \" is required\" case \"email\": message = e.Field() + \" must be a valid email address\" case \"min\": message = e.Field() + \" must be at least \" + e.Param() + \" characters\" case \"max\": message = e.Field() + \" must be at most \" + e.Param() + \" characters\" case \"alphanum\": message = e.Field() + \" must contain only letters and numbers\" case \"gte\": message = e.Field() + \" must be greater than or equal to \" + e.Param() case \"lte\": message = e.Field() + \" must be less than or equal to \" + e.Param() case \"url\": message = e.Field() + \" must be a valid URL\" case \"e164\": message = e.Field() + \" must be a valid phone number (E.164 format)\" case \"strongpassword\": message = e.Field() + \" must contain at least one uppercase letter, one lowercase letter, and one number\" default: message = e.Field() + \" is invalid\" } errors = append(errors, message) } } else { errors = append(errors, err.Error()) } return errors } // InitCustomValidators はカスタムバリデーターを登録する func InitCustomValidators(v *validator.Validate) { v.RegisterValidation(\"strongpassword\", strongPassword) } 実行結果 リクエストに対してバリデーションが行われ、期待通りバリデーションエラーがアサートされていて、 アサートと対応するカスタム文字列がレスポンスに含まれていることが確認できる。 $ curl -X POST http://localhost:8080/api/register -H \"Content-Type: application/json\" -d \'{\"username\":\"john123\",\"email\":\"john@example.com\",\"password\":\"SecurePass123\",\"age\":25,\"website\":\"https://example.com\"}\' {\"email\":\"john@example.com\",\"message\":\"Registration successful\",\"username\":\"john123\"} 2. ユーザー名が短すぎる {\"details\":[\"Username must be at least 3 characters\"],\"error\":\"Validation failed\"} 3. 弱いパスワード(カスタムバリデーター) {\"details\":[\"Password must contain at least one uppercase letter, one lowercase letter, and one number\"],\"error\":\"Validation failed\"} 4. 年齢が18歳未満 {\"details\":[\"Age must be greater than or equal to 18\"],\"error\":\"Validation failed\"} まとめ 組み込みバリデーションの他に、カスタムバリデーションを追加できることを確認した。 静的型付けにより実行時エラーに頼ることのないある種の堅牢さがあることも見てとれた。
Golang + Gin Framework で Hello World してみた話 〜基本的なルーティング、バスパラメタ・クエリパラメタ・JSON Req/Res、フォームデータ
Golang+GinでAPIを大量に書くことになりそうなので予習することにする。 コード自体はAI Agentで書こうと思うが、まずはGinのフィーチャーを把握する必要がある。 AI Agentを使用してAPI毎にフィーチャーを試せる学習用プロジェクトを構築する。 著者のスペックは、昔仕事でLaravelでWebアプリを書いたことがある。 [arst_toc tag=\"h4\"] Ginについて 🚀 高速なパフォーマンス martini に似たAPIを持ちながら、httprouter のおかげでそれより40倍以上も速いパフォーマンスがあります。 **基数木(Radix Tree)**ベースのルーティングを採用しており、メモリ効率が良く、高速なルーティングを実現しています。 他のGo製Webフレームワークと比較して、ベンチマークで優れた速度を示すことが多く、特に高スループットな REST API や マイクロサービス の構築に適しています。 Laravelは遅くて有名だったが、速いのは良いこと。 Golang自体ネイティブ実行だし、Golang用フレームワークの中でも速度にフィーチャーした構造。 たいした同時実行数を捌かないなら別に遅くても良いし、速いなら良いよね、ぐらい。 🧩 ミドルウェアのサポート 受信したHTTPリクエストを、ミドルウェアのチェーンと最終的なアクション(ハンドラー)で処理する仕組みを提供します。 ロガー、認証、GZIP圧縮など、様々な機能を簡単に組み込むことができます。 ミドルウェアくらい使えないと困るよね。認証を書きたい。 🛡️ クラッシュフリー HTTPリクエスト処理中に発生したpanicをキャッチし、**リカバリー(回復)**する機能が組み込まれています。 これにより、サーバーがクラッシュするのを防ぎ、サービスを常に利用可能な状態に保ちます。 🔗 ルートのグループ化 認証が必要なルートやAPIのバージョンごとなど、関連するルートをグループ化して整理する機能があり、共通のミドルウェアを適用しやすいです。 フルスタックフレームワークではないので、これだけしか書かれていない。 シンプルであることは良いこと。 学習用プロジェクトの構成 いったん、こんな感じで構成。 golang-gin/ ├── docker-compose.yml ├── Dockerfile ├── go.mod ├── go.sum ├── main.go ├── README.md └── handlers/ ├── hello.go # Hello World API ├── params.go # パラメータ処理 ├── json.go # JSON処理 ├── middleware.go # ミドルウェア ├── validation.go # バリデーション ├── file.go # ファイルアップロード └── grouping.go # ルートグループ化 学習計画とAPI API毎にフィーチャーを実装していくスタイルとする。 Claude Codeにその一覧を出力すると以下の通り。 | No. | 機能 | エンドポイント | メソッド | 説明 | |-----|--------------|----------------------|------|----------------------| | 1 | 基本的なルーティング | /hello | GET | Hello World を返す基本API | | 2 | パスパラメータ | /users/:id | GET | URL パスからパラメータを取得 | | 3 | クエリパラメータ | /search | GET | クエリ文字列からパラメータを取得 | | 4 | JSON レスポンス | /api/user | GET | 構造体を JSON で返す | | 5 | JSON リクエスト | /api/user | POST | JSON をバインドして処理 | | 6 | フォームデータ | /form | POST | フォームデータの受け取り | | 7 | バリデーション | /api/register | POST | 入力データのバリデーション | | 8 | ファイルアップロード | /upload | POST | 単一ファイルのアップロード | | 9 | 複数ファイルアップロード | /upload/multiple | POST | 複数ファイルのアップロード | | 10 | ミドルウェア (ログ) | /api/protected | GET | カスタムミドルウェアの実装 | | 11 | ルートグループ化 | /v1/users, /v2/users | GET | API バージョニング | | 12 | エラーハンドリング | /error | GET | エラーレスポンスの処理 | | 13 | カスタムバリデーター | /api/validate | POST | カスタムバリデーションルール | | 14 | リダイレクト | /redirect | GET | リダイレクト処理 | | 15 | 静的ファイル配信 | /static/* | GET | 静的ファイルの提供 | Hello World まずは Hello World を返すAPIを作る。 main.goは以下の通り。./handlers 以下に実態を書いていく。 package main import ( \"github.com/gin-gonic/gin\" \"github.com/ikuty/golang-gin/handlers\" ) func main() { // Ginエンジンの初期化 r := gin.Default() // Hello World API r.GET(\"/hello\", handlers.HelloHandler) // サーバー起動 r.Run(\":8080\") } ./handlers/hello.go は以下の通り。 package handlers import ( \"net/http\" \"github.com/gin-gonic/gin\" ) // HelloHandler は Hello World を返すハンドラー func HelloHandler(c *gin.Context) { c.JSON(http.StatusOK, gin.H{ \"message\": \"Hello World\", }) } 試す。入門した。 $ curl http://localhost:8080/hello {\"message\":\"Hello World\"} パスパラメータ URL内にプレースホルダを設定し、URLのプレースホルダと対応する値を変数で受けられる機能。 package main import ( \"github.com/gin-gonic/gin\" \"github.com/ikuty/golang-gin/handlers\" ) func main() { // Ginエンジンの初期化 r := gin.Default() // 1. 基本的なルーティング r.GET(\"/hello\", handlers.HelloHandler) // 2. パスパラメータ r.GET(\"/users/:id\", handlers.GetUserByIDHandler) // サーバー起動 r.Run(\":8080\") } ./handlers/params.goは以下。 Laravelと同じところに違和感。型はどこいった..? Ginでは、パスパラメータは常に文字列(string)として取得される。 URLから取得したパラメータを別の型(intやuintなど)として扱いたい場合は、 取得した文字列を明示的に型変換する必要がある。 package handlers import ( \"net/http\" \"github.com/gin-gonic/gin\" ) // GetUserByIDHandler は URL パスパラメータからユーザーIDを取得するハンドラー func GetUserByIDHandler(c *gin.Context) { // パスパラメータ :id を取得 id := c.Param(\"id\") c.JSON(http.StatusOK, gin.H{ \"user_id\": id, \"message\": \"User ID retrieved from path parameter\", }) } 実行。 # 数値IDのテスト $ curl http://localhost:8080/users/123 {\"message\":\"User ID retrieved from path parameter\",\"user_id\":\"123\"} # 文字列IDのテスト $ curl http://localhost:8080/users/alice {\"message\":\"User ID retrieved from path parameter\",\"user_id\":\"alice\"} クエリパラメータ クエリパラメータを受け取る方法は以下。 まぁシンプル。 package handlers import ( \"net/http\" \"github.com/gin-gonic/gin\" ) // SearchHandler は クエリパラメータから検索条件を取得するハンドラー func SearchHandler(c *gin.Context) { // クエリパラメータを取得 query := c.Query(\"q\") // ?q=keyword page := c.DefaultQuery(\"page\", \"1\") // ?page=2 (デフォルト値: \"1\") limit := c.DefaultQuery(\"limit\", \"10\") // ?limit=20 (デフォルト値: \"10\") // オプショナルなパラメータ sort := c.Query(\"sort\") // 値がない場合は空文字列 c.JSON(http.StatusOK, gin.H{ \"query\": query, \"page\": page, \"limit\": limit, \"sort\": sort, \"message\": \"Query parameters retrieved successfully\", }) } 実行結果は以下。 # パスパラメータ $ curl http://localhost:8080/users/123 {\"message\":\"User ID retrieved from path parameter\",\"user_id\":\"123\"} # クエリパラメータ $ curl \"http://localhost:8080/search?q=test&page=2\" {\"limit\":\"10\",\"message\":\"Query parameters retrieved successfully\",\"page\":\"2\",\"query\":\"test\",\"sort\":\"\"} JSONリクエスト/JSONレスポンス Content-Type: application/json で半構造化データ(JSON)を送り、構造体で受けることができる。 また、構造体を Content-Type: application/json でJSON文字列を返すことができる。 構造体のメンバに型を定義しておくことで、文字列がメンバ型に変換(バインド)できる。 まずルーティングは以下の通り。 package main import ( \"github.com/gin-gonic/gin\" \"github.com/ikuty/golang-gin/handlers\" ) func main() { // Ginエンジンの初期化 r := gin.Default() // 4. JSON レスポンス r.GET(\"/api/user\", handlers.GetUserHandler) // 5. JSON リクエスト r.POST(\"/api/user\", handlers.CreateUserHandler) // サーバー起動 r.Run(\":8080\") } ハンドラは以下の通り。 バインドの記述が興味深い。バインド時にバリデーションを実行している。 package handlers import ( \"net/http\" \"github.com/gin-gonic/gin\" ) // User 構造体 type User struct { ID int `json:\"id\"` Name string `json:\"name\"` Email string `json:\"email\"` Age int `json:\"age\"` IsActive bool `json:\"is_active\"` } // GetUserHandler は 構造体を JSON で返すハンドラー func GetUserHandler(c *gin.Context) { // サンプルユーザーデータ user := User{ ID: 1, Name: \"John Doe\", Email: \"john@example.com\", Age: 30, IsActive: true, } c.JSON(http.StatusOK, user) } // CreateUserRequest はユーザー作成リクエストの構造体 type CreateUserRequest struct { Name string `json:\"name\" binding:\"required\"` Email string `json:\"email\" binding:\"required,email\"` Age int `json:\"age\" binding:\"required,gte=0,lte=150\"` } // CreateUserHandler は JSON リクエストをバインドして処理するハンドラー func CreateUserHandler(c *gin.Context) { var req CreateUserRequest // JSON をバインド(バリデーションも実行される) if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{ \"error\": \"Invalid request\", \"details\": err.Error(), }) return } // 作成されたユーザーを返す(実際はDBに保存する) user := User{ ID: 100, // 仮のID Name: req.Name, Email: req.Email, Age: req.Age, IsActive: true, } c.JSON(http.StatusCreated, gin.H{ \"message\": \"User created successfully\", \"user\": user, }) } 実行結果は以下。 1. GET - JSON レスポンス $ curl http://localhost:8080/api/user {\"id\":1,\"name\":\"John Doe\",\"email\":\"john@example.com\",\"age\":30,\"is_active\":true} 2. POST - 正常なリクエスト $ curl -X POST http://localhost:8080/api/user -H \"Content-Type: application/json\" -d \'{\"name\":\"Alice\",\"email\":\"alice@example.com\",\"age\":25}\' {\"message\":\"User created successfully\",\"user\":{\"id\":100,\"name\":\"Alice\",\"email\":\"alice@example.com\",\"age\":25,\"is_active\":true}} 3. POST - バリデーションエラー(メール形式) $ curl -X POST http://localhost:8080/api/user -H \"Content-Type: application/json\" -d \'{\"name\":\"Bob\",\"email\":\"invalid-email\",\"age\":30}\' {\"details\":\"Key: \'CreateUserRequest.Email\' Error:Field validation for \'Email\' failed on the \'email\' tag\",\"error\":\"Invalid request\"} 4. POST - バリデーションエラー(年齢範囲) $ curl -X POST http://localhost:8080/api/user -H \"Content-Type: application/json\" -d \'{\"name\":\"Charlie\",\"email\":\"charlie@example.com\",\"age\":200}\' {\"details\":\"Key: \'CreateUserRequest.Age\' Error:Field validation for \'Age\' failed on the \'lte\' tag\",\"error\":\"Invalid request\"} フォームデータ フォームデータの送信例。ルーティングは以下。 POSTで送ったフィールドを丸っと構造体にする例と、 それぞれのフィールドを個別に取得する例の2つ。 package main import ( \"github.com/gin-gonic/gin\" \"github.com/ikuty/golang-gin/handlers\" ) func main() { // Ginエンジンの初期化 r := gin.Default() // 6. フォームデータ r.POST(\"/form/login\", handlers.LoginHandler) r.POST(\"/form/post\", handlers.PostFormHandler) // サーバー起動 r.Run(\":8080\") } ハンドラは以下。丸っとフォームデータを構造体にバインドできるし、 個別にアクセスすることもできる。 シンプルというか、少ない道具でなんとかするタイプ。 package handlers import ( \"net/http\" \"github.com/gin-gonic/gin\" ) // LoginForm はログインフォームの構造体 type LoginForm struct { Username string `form:\"username\" binding:\"required\"` Password string `form:\"password\" binding:\"required,min=6\"` Remember bool `form:\"remember\"` } // LoginHandler はフォームデータを受け取るハンドラー func LoginHandler(c *gin.Context) { var form LoginForm // フォームデータをバインド if err := c.ShouldBind(&form); err != nil { c.JSON(http.StatusBadRequest, gin.H{ \"error\": \"Invalid form data\", \"details\": err.Error(), }) return } // 実際はここで認証処理を行う c.JSON(http.StatusOK, gin.H{ \"message\": \"Login successful\", \"username\": form.Username, \"remember\": form.Remember, }) } // PostFormHandler は個別にフォームフィールドを取得するハンドラー func PostFormHandler(c *gin.Context) { // 個別のフォームフィールドを取得 title := c.PostForm(\"title\") content := c.DefaultPostForm(\"content\", \"No content provided\") tags := c.PostFormArray(\"tags\") // 配列として取得 c.JSON(http.StatusOK, gin.H{ \"message\": \"Form data received\", \"title\": title, \"content\": content, \"tags\": tags, }) } 実行例は以下。 1. ログインフォーム - 正常 $ curl -X POST http://localhost:8080/form/login -d \"username=john&password=secret123\" {\"message\":\"Login successful\",\"remember\":false,\"username\":\"john\"} 2. ログインフォーム - remember 付き $ curl -X POST http://localhost:8080/form/login -d \"username=alice&password=pass123&remember=true\" {\"message\":\"Login successful\",\"remember\":true,\"username\":\"alice\"} 3. ログインフォーム - バリデーションエラー $ curl -X POST http://localhost:8080/form/login -d \"username=bob&password=123\" {\"details\":\"Key: \'LoginForm.Password\' Error:Field validation for \'Password\' failed on the \'min\' tag\",\"error\":\"Invalid form data\"} 4. 投稿フォーム - 配列データ $ curl -X POST http://localhost:8080/form/post -d \"title=Hello&content=World&tags=go&tags=gin&tags=api\" {\"content\":\"World\",\"message\":\"Form data received\",\"tags\":[\"go\",\"gin\",\"api\"],\"title\":\"Hello\"} まとめ いったん、以下を試した。 基本的なルーティング バスパラメタ・クエリパラメタ JSON Request/Response フォームデータ シンプルすぎてClaude Codeが機能を絞っているのか疑ったが、 公式を読む限り、若干バリエーションが増える程度の様子。 これならわざわざClaudeに入門コースを作ってもらわなくても上から読めば良いかな。
Snowflake MCPサーバを試してみた
何周遅れか分からないが、Snowflake MCPサーバを試してみたのでアウトプットしてみる。 AI AgentはClaude Code。MCPの構築と接続設定自体をClaude Codeで行なった。 この記事で使用したMCPサーバは以下。いわゆる野良MCPサーバ。 [clink implicit=\"false\" url=\"https://github.com/isaacwasserman/mcp-snowflake-server\" imgurl=\"https://camo.githubusercontent.com/bdcfca988b369e51051c3201cedfc429354b0801a0c5d88aa3eb00ae37e7188b/68747470733a2f2f6d736565702e6e65742f70722f69736161637761737365726d616e2d6d63702d736e6f77666c616b652d7365727665722d62616467652e706e67\" title=\"Snowflake MCP Server\" excerpt=\"A Model Context Protocol (MCP) server implementation that provides database interaction with Snowflake. This server enables running SQL queries via tools and exposes data insights and schema context as resources.\"] [arst_toc tag=\"h4\"] 前提となる環境 Macにnode、uv、Claude Codeを導入済み。 # 諸々のバージョンは以下 $ sw_vers ProductName: macOS ProductVersion: 15.6 BuildVersion: 24G84 # nodeは導入済み $ node --version v24.4.1 # uvは導入済み $ uv --version 0.8.13 (ede75fe62 2025-08-21)0.8.13 (ede75fe62 2025-08-21) # Claude Codeは導入済み $ claude --version 1.0.89 (Claude Code) # 検証用ディレクトリの作成と移動。以降ここで検証を実施。 $ mkdir snowflake-mcp-server && cd $_ 環境構築 プロンプトとその回答は省略する。要件を伝え環境構築を実施してもらった。 Claudeに対するMCPの設定方法としてグローバルに設定する方法と、 プロジェクト個別に設定する方法があるが、当初グローバル設定の方法を案内された。 2回目のプロンプトでプロジェクト個別設定に変更してもらった。 最終的なファイル構成は以下の通り。 $ tree . . ├── mcp-snowflake-server │ ├── Dockerfile │ ├── example_connections.toml │ ├── LICENSE │ ├── pyproject.toml │ ├── README.md │ ├── runtime_config.json │ ├── src │ │ └── mcp_snowflake_server │ │ ├── __init__.py │ │ ├── db_client.py │ │ ├── serialization.py │ │ ├── server.py │ │ └── write_detector.py │ └── uv.lock ├── PROJECT_SPECIFIC_SETUP.md ├── SETUP.md ├── snowflake_connections.toml └── test_server.py snowflake_connections.toml に Snowflakeの接続設定を書くだけで終了。 試してみた claudeを起動すると .mcp.json が認識された。 $ claude ╭─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮ │ │ │ New MCP server found in .mcp.json: snowflake │ │ │ │ MCP servers may execute code or access system resources. All tool calls require approval. Learn more in the MCP documentation │ │ (https://docs.anthropic.com/s/claude-code-mcp). │ │ │ │ ❯ 1. Use this and all future MCP servers in this project │ │ 2. Use this MCP server │ │ 3. Continue without using this MCP server │ │ │ ╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ > Snowflakeのデータベース `sample_db` におけるスキーマの一覧を教えてください。 ⏺ snowflake - list_schemas (MCP)(database: \"sample_db\") ⎿ type: data data_id: e40f8385-e3c0-4848-aa36-c1b8fada788f database: sample_db … +8 lines (ctrl+r to expand) ⎿ [Resource from snowflake at data://e40f8385-e3c0-4848-aa36-c1b8fada788f] { \"type\": \"data\", \"data_id\": \"e40f8385-e3c0-4848-aa36-c1b8fada788f\", … +25 lines (ctrl+r to expand) ⏺ sample_db データベースには以下の7つのスキーマがあります: - CLEANSED - INTERMEDIATE - PUBLIC - SAMPLE_SCHEMA > cleansedスキーマ内のテーブルの一覧を教えてください。 ⏺ CLEANSED スキーマには以下の19のテーブルがあります: ... メタ情報(description) dbtでモデルを作る際にdescriptionを付与することで、テーブル、カラムにdescriptionを付与できる。 その先、セマンティックレイヤーの存在がチラつくが、descriptionだけでもそこそこいける。 まとめ Claude Codeにローカル起動型(Dockerfile)のSnowflake MCPサーバを導入してみた。 その際、Claude Code自身にMCPサーバの導入を依頼し、ほぼノーコードで導入できることを確認した。 テーブル、カラムにdescriptionを付与しておくと、MCPサーバを通じて取得・消費される。 クエリによって得られる知識がClaudeに蓄積され、知識の文脈で会話ができるようになることを確認した。
Fellegi-Sunterモデルに基づく確率的名寄せパッケージ Splinkを試してみる
[mathjax] Record Linkage、Entity Recognitionなど、いわゆる「名寄せ」問題には、割とさまざまな解決策が 提案されている。その1つに確率論的な数学的背景を持つFellegi-Sunterモデルを実装したSplinkがある。 数学的背景について咀嚼して説明してくれているサイトはあまりなく、Splinkの公式ドキュメントが唯一 に近いソースだろうと思う。 今回は5万人の人物データを使って重複を除去するSplinkの主要な機能のサンプルを追ってみた。 数式を読んだところで実りは薄い、ということもあり、なるべく数式を読まずに読み進めたが、 モデルのパラメタ、予測結果を解釈する際にどうしても数式を読まないとわからない箇所があった。 特にベイズ統計について理解がかなり浅くて、正直なところ理解しきれてはいない。 Splinkの公式ドキュメントを読み進めていった際に気付いたメモを書いてみることにした。 [arst_toc tag=\"h4\"] 環境構築・準備 環境構築の実装部分は主な学習要素ではないのでClaude Codeでサクッと作ってみた。 まずはPlan modeで計画を立ててもらう。これぐらいだとほぼ言い値で使える。 DuckDB、Splinkを使う最小構成の環境を作ります。環境はDocker composeで構築します。計画を立ててください。 ⏺ DuckDB、Splinkを使うDocker Compose環境の計画を完了しました。 計画概要 1. 設計文書: .tmp/design.mdで要件、アーキテクチャ、技術仕様を定義 2. タスク管理: .tmp/task.mdで詳細な実装手順を3フェーズに分けて整理 主要コンポーネント - Jupyter Lab環境: DuckDB、Splink、必要なライブラリがプリインストール - データボリューム: サンプルデータ、notebook、出力用のディレクトリ - 最小構成: 開発・学習用途に特化した軽量環境 次のステップ 実装を開始する場合は、Phase 1から順番に進めます: 1. Docker設定ファイル作成 2. ディレクトリ構造構築 3. サンプル実装 4. テストと検証 計画の詳細は.tmp/design.mdと.tmp/task.mdをご確認ください。実装を開始しますか? 実際にファイルを作ってもらうと以下のような構成となる。 得られる中間成果物である docker-compose.yml、Dockerfile は本記事では省略する。 . ├── data ├── docker-compose.yml ├── Dockerfile ├── notebooks ├── output └── requirements.txt 普通に http://localhost:8888 で JupyterLab が開く。 使用するサンプルデータ 5万人の人物データを使って名寄せを行うサンプル。おそらくSplinkの用途として最初に思いつくやつ。 Splinkにデータをロードする前に必要なデータクリーニング手順について説明がある。 公式によると、まずは行に一意のIDを割り当てる必要がある。 データセット内で一意となるIDであって、重複除去した後のエンティティを識別するIDのことではない。 [clink implicit=\"false\" url=\"https://moj-analytical-services.github.io/splink/demos/tutorials/01_Prerequisites.html\" imgurl=\"https://user-images.githubusercontent.com/7570107/85285114-3969ac00-b488-11ea-88ff-5fca1b34af1f.png\" title=\"Data Prerequisites\" excerpt=\"Splink では、リンクする前にデータをクリーンアップし、行に一意の ID を割り当てる必要があります。このセクションでは、Splink にデータをロードする前に必要な追加のデータクリーニング手順について説明します。\"] 使用するサンプルデータは以下の通り。 from splink import splink_datasets df = splink_datasets.historical_50k df.head() データの分布を可視化 splink.exploratoryのprofile_columnsを使って分布を可視化してみる。 from splink import DuckDBAPI from splink.exploratory import profile_columns db_api = DuckDBAPI() profile_columns(df, db_api, column_expressions=[\"first_name\", \"substr(surname,1,2)\"]) 同じ姓・名の人が大量にいることがわかる。 ブロッキングとブロッキングルールの評価 テーブル内のレコードが他のレコードと「同一かどうか」を調べるためには、 基本的には、他のすべてのレコードとの何らかの比較操作を行うこととなる。 全てのレコードについて全てのカラム同士を比較したいのなら、 対象のテーブルをCROSS JOINした結果、各カラム同士を比較することとなる。 SELECT ... FROM input_tables as l CROSS JOIN input_tables as r あるカラムが条件に合わなければ、もうその先は見ても意味がない、 というケースは多い。例えば、まず first_name 、surname が同じでなければ、 その先の比較を行わない、というのはあり得る。 SELECT ... FROM input_tables as l INNER JOIN input_tables as r ON l.first_name = r.first_name AND l.surname = r.surname このような考え方をブロッキング、ON句の条件をブロッキングルールと言う。 ただ、これだと性と名が完全一致していないレコードが残らない。 そこで、ブロッキングルールを複数定義し、いずれかが真であれば残すことができる。 ここでポイントなのが、ブロッキングルールを複数定義したとき、 それぞれのブロッキングルールで重複して選ばれるレコードが発生した場合、 Splinkが自動的に排除してくれる。 このため、ブロッキングルールを重ねがけすると、最終的に残るレコード数は一致する。 ただ、順番により、同じルールで残るレコード数は変化する。 逆に言うと、ブロッキングルールを足すことで、重複除去後のOR条件が増えていく。 積算グラフにして、ブロッキングルールとその順番の効果を見ることができる。 from splink import DuckDBAPI, block_on from splink.blocking_analysis import ( cumulative_comparisons_to_be_scored_from_blocking_rules_chart, ) blocking_rules = [ block_on(\"substr(first_name,1,3)\", \"substr(surname,1,4)\"), block_on(\"surname\", \"dob\"), block_on(\"first_name\", \"dob\"), block_on(\"postcode_fake\", \"first_name\"), block_on(\"postcode_fake\", \"surname\"), block_on(\"dob\", \"birth_place\"), block_on(\"substr(postcode_fake,1,3)\", \"dob\"), block_on(\"substr(postcode_fake,1,3)\", \"first_name\"), block_on(\"substr(postcode_fake,1,3)\", \"surname\"), block_on(\"substr(first_name,1,2)\", \"substr(surname,1,2)\", \"substr(dob,1,4)\"), ] db_api = DuckDBAPI() cumulative_comparisons_to_be_scored_from_blocking_rules_chart( table_or_tables=df, blocking_rules=blocking_rules, db_api=db_api, link_type=\"dedupe_only\", ) 積算グラフは以下の通り。積み上がっている数値は「比較の数」。 要は、論理和で条件を足していって、次第に緩和されている様子がわかる。 DuckDBでは比較の数を2,000万件以内、Athena,Sparkでは1億件以内を目安にせよとのこと。 比較の定義 Splinkは Fellegi-Sunter model モデル (というかフレームワーク) に基づいている。 https://moj-analytical-services.github.io/splink/topic_guides/theory/fellegi_sunter.html 各カラムの同士をカラムの特性に応じた距離を使って比較し、重みを計算していく。 各カラムの比較に使うためのメソッドが予め用意されているので、特性に応じて選んでいく。 以下では、first_name, sur_name に ForenameSurnameComparison が使われている。 dobにDateOfBirthComparison、birth_place、ocupationにExactMatchが使われている。 import splink.comparison_library as cl from splink import Linker, SettingsCreator settings = SettingsCreator( link_type=\"dedupe_only\", blocking_rules_to_generate_predictions=blocking_rules, comparisons=[ cl.ForenameSurnameComparison( \"first_name\", \"surname\", forename_surname_concat_col_name=\"first_name_surname_concat\", ), cl.DateOfBirthComparison( \"dob\", input_is_string=True ), cl.PostcodeComparison(\"postcode_fake\"), cl.ExactMatch(\"birth_place\").configure(term_frequency_adjustments=True), cl.ExactMatch(\"occupation\").configure(term_frequency_adjustments=True), ], retain_intermediate_calculation_columns=True, ) # Needed to apply term frequencies to first+surname comparison df[\"first_name_surname_concat\"] = df[\"first_name\"] + \" \" + df[\"surname\"] linker = Linker(df, settings, db_api=db_api) ComparisonとComparison Level ここでSplinkツール内の比較の概念の説明。以下の通り概念に名前がついている。 Data Linking Model ├─-- Comparison: Date of birth │ ├─-- ComparisonLevel: Exact match │ ├─-- ComparisonLevel: One character difference │ ├─-- ComparisonLevel: All other ├─-- Comparison: First name │ ├─-- ComparisonLevel: Exact match on first_name │ ├─-- ComparisonLevel: first_names have JaroWinklerSimilarity > 0.95 │ ├─-- ComparisonLevel: first_names have JaroWinklerSimilarity > 0.8 │ ├─-- ComparisonLevel: All other モデルのパラメタ推定 モデルの実行に必要なパラメタは以下の3つ。Splinkを用いてパラメタを得る。 ちなみに u は \"\'U\'nmatch\"、m は \"\'M\'atch\"。背後の数式の説明で現れる。 No パラメタ 説明 1 無作為に選んだレコードが一致する確率 入力データからランダムに取得した2つのレコードが一致する確率 (通常は非常に小さい数値) 2 u値(u確率) 実際には一致しないレコードの中で各 ComparisonLevel に該当するレコードの割合。具体的には、レコード同士が同じエンティティを表すにも関わらず値が異なる確率。例えば、同じ人なのにレコードによって生年月日が違う確率。これは端的には「データ品質」を表す。名前であればタイプミス、別名、ニックネーム、ミドルネーム、結婚後の姓など。 3 m値(m確率) 実際に一致するレコードの中で各 ComparisonLevel に該当するレコードの割合。具体的には、レコード同士が異なるエンティティを表すにも関わらず値が同じである確率。例えば別人なのにレコードによって性・名が同じ確率 (同姓同名)。性別は男か女かしかないので別人でも50%の確率で一致してしまう。 無作為に選んだレコードが一致する確率 入力データからランダムに抽出した2つのレコードが一致する確率を求める。 値は0.000136。すべての可能なレコードのペア比較のうち7,362.31組に1組が一致すると予想される。 合計1,279,041,753組の比較が可能なため、一致するペアは合計で約173,728.33組になると予想される、 とのこと。 linker.training.estimate_probability_two_random_records_match( [ block_on(\"first_name\", \"surname\", \"dob\"), block_on(\"substr(first_name,1,2)\", \"surname\", \"substr(postcode_fake,1,2)\"), block_on(\"dob\", \"postcode_fake\"), ], recall=0.6, ) > Probability two random records match is estimated to be 0.000136. > This means that amongst all possible pairwise record comparisons, > one in 7,362.31 are expected to match. > With 1,279,041,753 total possible comparisons, > we expect a total of around 173,728.33 matching pairs u確率の推定 実際には一致しないレコードの中でComparisonの評価結果がPositiveである確率。 基本、無作為に抽出したレコードは一致しないため、「無作為に抽出したレコード」を 「実際には一致しないレコード」として扱える、という点がミソ。 probability_two_random_records_match によって得られた値を使ってu確率を求める。 estimate_u_using_random_sampling によって、ラベルなし、つまり教師なしでu確率を得られる。 レコードのペアをランダムでサンプルして上で定義したComparisonを評価する。 ランダムサンプルなので大量の不一致が発生するが、各Comparisonにおける不一致の分布を得ている。 これは、例えば性別について、50%が一致、50%が不一致である、という分布を得ている。 一方、例えば生年月日について、一致する確率は 1%、1 文字の違いがある確率は 3%、 その他はすべて 96% の確率で発生する、という分布を得ている。 linker.training.estimate_u_using_random_sampling(max_pairs=5e6) > ----- Estimating u probabilities using random sampling ----- > > Estimated u probabilities using random sampling > > Your model is not yet fully trained. Missing estimates for: > - first_name_surname (no m values are trained). > - dob (no m values are trained). > - postcode_fake (no m values are trained). > - birth_place (no m values are trained). > - occupation (no m values are trained). m確率の推定 「実際に一致するレコード」の中で、Comparisonの評価がNegativeになる確率。 そもそも、このモデルを使って名寄せ、つまり「一致するレコード」を見つけたいのだから、 モデルを作るために「実際に一致するレコード」を計算しなければならないのは矛盾では..となる。 無作為抽出結果から求められるu確率とは異なり、m確率を求めるのは難しい。 もしラベル付けされた「一致するレコード」、つまり教師データセットがあるのであれば、 そのデータセットを使ってm確率を求められる。 例えば、日本人全員にマイナンバーが振られて、全てのレコードにマイナンバーが振られている、 というアナザーワールドがあるのであれば、マイナンバーを使ってm確率を推定する。(どういう状況??) ラベル付けされたデータがないのであれば、EMアルゴリズムでm確率を求めることになっている。 EMアルゴリズムは反復的な手法で、メモリや収束速度の点でペア数を減らす必要があり、 例ではブロッキングルールを設定している。 以下のケースでは、first_nameとsurnameをブロッキングルールとしている。 つまり、first_name, surnameが完全に一致するレコードについてペア比較を行う。 この仮定を設定したため、first_name, surname (first_name_surname) のパラメタを推定できない。 training_blocking_rule = block_on(\"first_name\", \"surname\") training_session_names = ( linker.training.estimate_parameters_using_expectation_maximisation( training_blocking_rule, estimate_without_term_frequencies=True ) ) > ----- Starting EM training session ----- > > Estimating the m probabilities of the model by blocking on: > (l.\"first_name\" = r.\"first_name\") AND (l.\"surname\" = r.\"surname\") > > Parameter estimates will be made for the following comparison(s): > - dob > - postcode_fake > - birth_place > - occupation > > Parameter estimates cannot be made for the following comparison(s) since they are used in the blocking rules: > - first_name_surname > > Iteration 1: Largest change in params was 0.248 in probability_two_random_records_match > Iteration 2: Largest change in params was 0.0929 in probability_two_random_records_match > Iteration 3: Largest change in params was -0.0237 in the m_probability of birth_place, level `Exact match on > birth_place` > Iteration 4: Largest change in params was 0.00961 in the m_probability of birth_place, level `All other >comparisons` > Iteration 5: Largest change in params was -0.00457 in the m_probability of birth_place, level `Exact match on birth_place` > Iteration 6: Largest change in params was -0.00256 in the m_probability of birth_place, level `Exact match on birth_place` > Iteration 7: Largest change in params was 0.00171 in the m_probability of dob, level `Abs date difference Iteration 8: Largest change in params was 0.00115 in the m_probability of dob, level `Abs date difference Iteration 9: Largest change in params was 0.000759 in the m_probability of dob, level `Abs date difference Iteration 10: Largest change in params was 0.000498 in the m_probability of dob, level `Abs date difference Iteration 11: Largest change in params was 0.000326 in the m_probability of dob, level `Abs date difference Iteration 12: Largest change in params was 0.000213 in the m_probability of dob, level `Abs date difference Iteration 13: Largest change in params was 0.000139 in the m_probability of dob, level `Abs date difference Iteration 14: Largest change in params was 9.04e-05 in the m_probability of dob, level `Abs date difference <= 10 year` 同様にdobをブロッキングルールに設定して実行すると、dob以外の列についてパラメタを推定できる。 training_blocking_rule = block_on(\"dob\") training_session_dob = ( linker.training.estimate_parameters_using_expectation_maximisation( training_blocking_rule, estimate_without_term_frequencies=True ) ) > ----- Starting EM training session ----- > > Estimating the m probabilities of the model by blocking on: > l.\"dob\" = r.\"dob\" > > Parameter estimates will be made for the following comparison(s): > - first_name_surname > - postcode_fake > - birth_place > - occupation > > Parameter estimates cannot be made for the following comparison(s) since they are used in the blocking rules: > - dob > > Iteration 1: Largest change in params was -0.474 in the m_probability of first_name_surname, level `Exact match on first_name_surname_concat` > Iteration 2: Largest change in params was 0.052 in the m_probability of first_name_surname, level `All other comparisons` > Iteration 3: Largest change in params was 0.0174 in the m_probability of first_name_surname, level `All other comparisons` > Iteration 4: Largest change in params was 0.00532 in the m_probability of first_name_surname, level `All other comparisons` > Iteration 5: Largest change in params was 0.00165 in the m_probability of first_name_surname, level `All other comparisons` > Iteration 6: Largest change in params was 0.00052 in the m_probability of first_name_surname, level `All other comparisons` > Iteration 7: Largest change in params was 0.000165 in the m_probability of first_name_surname, level `All other comparisons` > Iteration 8: Largest change in params was 5.29e-05 in the m_probability of first_name_surname, level `All other comparisons` > > EM converged after 8 iterations > > Your model is not yet fully trained. Missing estimates for: > - first_name_surname (some u values are not trained). モデルパラメタの可視化 m確率、u確率の可視化。 マッチウェイトの可視化。マッチウェイトは (log_2 (m / u))で計算される。 linker.visualisations.match_weights_chart() モデルの保存と読み込み 以下でモデルを保存できる。 settings = linker.misc.save_model_to_json( \"./saved_model_from_demo.json\", overwrite=True ) 以下で保存したモデルを読み込める。 import json settings = json.load( open(\'./saved_model_from_demo.json\', \'r\') ) リンクするのに十分な情報が含まれていないレコード 「John Smith」のみを含み、他のすべてのフィールドがnullであるレコードは、 他のレコードにリンクされている可能性もあるが、潜在的なリンクを明確にするには十分な情報がない。 以下により可視化できる。 linker.evaluation.unlinkables_chart() 横軸は「マッチウェイトの閾値」。縦軸は「リンクするのに十分な情報が含まれないレコード」の割合。 マッチウェイト閾値=6.11ぐらいのところを見ると、入力データセットのレコードの約1.3%が リンクできないことが示唆される。 訓練済みモデルを使って未知データのマッチウェイトを予測 上で構築した推定モデルを使用し、どのペア比較が一致するかを予測する。 内部的には以下を行うとのこと。 blocking_rules_to_generate_predictionsの少なくとも1つと一致するペア比較を生成 Comparisonで指定されたルールを使用して、入力データの類似性を評価 推定された一致重みを使用し、要求に応じて用語頻度調整を適用して、最終的な一致重みと一致確率スコアを生成 df_predictions = linker.inference.predict(threshold_match_probability=0.2) df_predictions.as_pandas_dataframe(limit=1) > Blocking time: 0.88 seconds > Predict time: 1.91 seconds > > -- WARNING -- > You have called predict(), but there are some parameter estimates which have neither been estimated or > specified in your settings dictionary. To produce predictions the following untrained trained parameters will > use default values. > Comparison: \'first_name_surname\': > u values not fully trained records_to_plot = df_e.to_dict(orient=\"records\") linker.visualisations.waterfall_chart(records_to_plot, filter_nulls=False) predictしたマッチウェイトの可視化、数式との照合 predictしたマッチウェイトは、ウォーターフォール図で可視化できる。 マッチウェイトは、モデル内の各特徴量によって一致の証拠がどの程度提供されるかを示す中心的な指標。 (lambda)は無作為抽出した2つのレコードが一致する確率。(K=m/u)はベイズ因子。 begin{align} M &= log_2 ( frac{lambda}{1-lambda} ) + log_2 K \\ &= log_2 ( frac{lambda}{1-lambda} ) + log_2 m - log_2 u end{align} 異なる列の比較が互いに独立しているという仮定を置いていて、 2つのレコードのベイズ係数が各列比較のベイズ係数の積として扱う。 begin{eqnarray} K_{feature} = K_{first_name_surname} + K_{dob} + K_{postcode_fake} + K_{birth_place} + K_{occupation} + cdots end{eqnarray} マッチウェイトは以下の和。 begin{eqnarray} M_{observe} = M_{prior} + M_{feature} end{eqnarray} ここで begin{align} M_{prior} &= log_2 (frac{lambda}{1-lambda}) \\ M_{feature} &= M_{first_name_surname} + M_{dob} + M_{postcode_fake} + M_{birth_place} + M_{occupation} + cdots end{align} 以下のように書き換える。 begin{align} M_{observe} &= log_2 (frac{lambda}{1-lambda}) + sum_i^{feature} log_2 (frac{m_i}{u_i}) \\ &= log_2 (frac{lambda}{1-lambda}) + log_2 (prod_i^{feature} (frac{m_i}{u_i}) ) end{align} ウォーターフォール図の一番左、赤いバーは(M_{prior} = log_2 (frac{lambda}{1-lambda}))。 特徴に関する追加の知識が考慮されていない場合のマッチウェイト。 横に並んでいる薄い緑のバーは (M_{first_name_surname} + M_{dob} + M_{postcode_fake} + M_{birth_place} + M_{occupation} + cdots)。 各特徴量のマッチウェイト。 一番右の濃い緑のバーは2つのレコードの合計マッチウェイト。 begin{align} M_{feature} &= M_{first_name_surname} + M_{dob} + M_{postcode_fake} + M_{birth_place} + M_{occupation} + cdots \\ &= 8.50w end{align} まとめ 長くなったのでいったん終了。この記事では教師なし確率的名寄せパッケージSplinkを使用してモデルを作ってみた。 次の記事では、作ったモデルを使用して実際に名寄せをしてみる。 途中、DuckDBが楽しいことに気づいたので、DuckDBだけで何個か記事にしてみようと思う。
AirflowでEnd-To-End Pipeline Testsを行うためにAirflow APIを調べてみた話
Airflow自体にDAGの実行結果をテスト(End-To-End Pipeline Tests)する仕組みは無いようで、 以下のような地道な仕組みを自力で作る必要がありそうです。 テストデータを用意する Airflowが提供するAirflow APIを使用してDAGを実行する DAGの終了を待つ 結果をAssertする 他にAirflow CLIも使えそうですが、pythonコードの一部にするならAPIの方が使い勝手が良さそうです。 API仕様書を上から読んでみたので、その感想を書いてみます。 他にもあるのですが、今回の用途に使いそうなものを抜粋しています。 \"読んでみた\"だけなので、誤りがあるかもしれません。概要を理解するぐらいの気持ちで読んでください。 [arst_toc tag=\"h4\"] Airflow API概要 今日時点のAirflow APIのAPI仕様書は以下です。 Airflow API (Stable) (2.10.0) RESTful APIとなっていて、Resourceに対するCRUDをHTTP Methodで表現します。 1つ、update_maskという考え方があります。リソースの値を更新する際、リソースjsonと同時に クエリパラメタで\"変更したい値は何か\"を渡すことで、リソースjsonの該当値のみを更新できます。 resource = request.get(\'/resource/my-id\').json() resource[\'my_field\'] = \'new-value\' request.patch(\'/resource/my-id?update_mask=my_field\', data=json.dumps(resource)) API Authenticationがusername/passwordで雑ですが、 DAGのis_pausedをtrueにするには、以下の通りpatchを叩くようです。 curl -X PATCH \'https://example.com/api/v1/dags/{dag_id}?update_mask=is_paused\' -H \'Content-Type: application/json\' --user \"username:password\" -d \'{ \"is_paused\": true }\' CORSを有効にする必要があります。Enabling CORS 様々なAPI認証が用意されています。API認証はAirflowのauth managerで管理されます。Authentication エラーはRFC7807準拠です。つまり、Unauthenticated、PermissionDenied、BadRequest、NotFound、MethodNotAllowed、NotAcceptable、AlreadyExistsが扱われます。Errors Connections ざっとAPIを眺めていきます。 まずはConnection。順当なCRUDです。patchでupdate_maskが使われます。 コードから一通りConnectionを触れそうです。 Testって何か調べてみました。 デフォルトでdisabledになっていますが、Airflow UI(Connections)から\"Test\"ボタンを押下できます。 Connectionと関連付けられたhookのtest_connection()メソッドを実行するようです。 これと同等の機能が動くようです。 Method Endpoint Overview Response GET /connections List Connection array of objects(ConnectionCollectionItem). POST /connections Create a Connection created connection. GET /connections/{connection_id} Get a connection connection PATCH /connections/{connection_id} Update a connection updated connection DELETE /connections/{connection_id} Delete a connection (Status) POST /connections/test Test a connection (Status) DAG 次はDAG。まずDAG一覧に対する操作。一覧に対してpatchを叩ける様子です。 Method Endpoint Overview GET /dags List DAGs in the database. dag_id_pattern can be set to match dags of a specific pattern PATCH /dags Update DAGs of a given dag_id_pattern using UpdateMask. This endpoint allows specifying ~ as the dag_id_pattern to update all DAGs. New in version 2.3.0 次は個別のDAGに対する操作。 Method Endpoint Overview GET /dags/{dag_id} Get basic information about a DAG.Presents only information available in database (DAGModel). If you need detailed information, consider using GET /dags/{dag_id}/details. PATCH /dags/{dag_id} Update a DAG. DELETE /dags/{dag_id} Deletes all metadata related to the DAG, including finished DAG Runs and Tasks. Logs are not deleted. This action cannot be undone.New in version 2.2.0 GET /dags/{dag_id}/tasks/detail Get simplified representation of a task. GET /dags/{dag_id}/detail Get a simplified representation of DAG.The response contains many DAG attributes, so the response can be large. If possible, consider using GET /dags/{dag_id}. Airflowにおいて、Operatorのインスタンスに\"Task\"という用語が割り当てられています。 つまり、「Operatorに定義した処理を実際に実行すること」が\"Task\"としてモデリングされています。 「\"Task\"をA月B日X時Y分Z秒に実行すること」が、\"TaskInstance\"としてモデリングされています。 あるDAGは、実行日/実行時間ごとの複数の\"TaskInstance\"を保持しています。 以下のAPIにおいて、DAGが保持する\"Task\",\"日付レンジ\"等を指定して実行します。 \"TaskInstance\"を\"Clear(再実行)\"します。また、\"TaskInstance\"の状態を一気に更新します。 Method Endpoint Overview POST /dags/{dag_id}/clearTaskInstances Clears a set of task instances associated with the DAG for a specified date range. POST /dags/{dag_id}/updateTaskInstancesState Updates the state for multiple task instances simultaneously. GET /dags/{dag_id}/tasks Get tasks for DAG. なんだこれ、ソースコードを取得できるらしいです。 Method Endpoint Overview GET /dagSources/{file_token} Get a source code using file token. DAGRun \"Task\"と\"TaskInstance\"の関係と同様に\"DAG\"と\"DAGRun\"が関係しています。 「A月B日X時Y分Z秒のDAG実行」が\"DAGRun\"です。DAGRun。順当な感じです。 新規にトリガしたり、既存のDAGRunを取得して更新したり削除したり、再実行したりできます。 Method Endpoint Overview GET /dags/{dag_id}/dagRuns List DAG runs.This endpoint allows specifying ~ as the dag_id to retrieve DAG runs for all DAGs. POST /dags/{dag_id}/dagRuns Trigger a new DAG run.This will initiate a dagrun. If DAG is paused then dagrun state will remain queued, and the task won\'t run. POST /dags/~/dagRuns/list List DAG runs (batch).This endpoint is a POST to allow filtering across a large number of DAG IDs, where as a GET it would run in to maximum HTTP request URL length limit. GET /dags/{dag_id}/dagRuns/{dag_run_id} Get a DAG run. DELETE /dags/{dag_id}/dagRuns/{dag_run_id} Delete a DAG run. PATCH /dags/{dag_id}/dagRuns/{dag_run_id} Modify a DAG run.New in version 2.2.0 POST /dags/{dag_id}/dagRuns/{dag_run_id}/clear Clear a DAG run.New in version 2.4.0 以下はスキップ.. Method Endpoint Overview GET /dags/{dag_id}/dagRuns/{dag_run_id}/upstreamDatasetEvents Get datasets for a dag run.New in version 2.4.0 PATCH /dags/{dag_id}/dagRuns/{dag_run_id}/setNote Update the manual user note of a DagRun.New in version 2.5.0 DAGWarning DAGのimport_errors一覧を返します。 Method Endpoint Overview GET /dagWarnings List Dag Waranings. DAGStats A DAG Run status is determined when the execution of the DAG is finished. The execution of the DAG depends on its containing tasks and their dependencies. The status is assigned to the DAG Run when all of the tasks are in the one of the terminal states (i.e. if there is no possible transition to another state) like success, failed or skipped. The DAG Run is having the status assigned based on the so-called “leaf nodes” or simply “leaves”. Leaf nodes are the tasks with no children. There are two possible terminal states for the DAG Run: success if all of the leaf nodes states are either success or skipped, failed if any of the leaf nodes state is either failed or upstream_failed. Method Endpoint Overview GET /dagStats List Dag statistics. ImportError Airflow Best PractiveのTesting a DagにDAGのテスト観点に関する記述が(サラッと)書かれています。 まず、DAGは普通のpythonコードなので、pythonインタプリタで実行する際にエラーが起きないことを確認すべし、とのことです。 以下の実行により、未解決の依存関係、文法エラーをチェックします。もちろん、どこで実行するかが重要なので、DAG実行環境と合わせる必要があります。 Airflow APIにより、このレベルのエラーがDAGファイルにあるか確認できるようです。 $ python your-dag-file.py Method Endpoint Overview GET /importErrors List import errors. GET /importErrors/{import_error_id} Get an import error. Variables DAGに記述したくないCredentials等を管理する仕組みで、Airflow UIからポチポチ操作すると作れます。 Variableはkey-valueそのままです。DAGからkeyを指定することで参照できます。 Airflow APIからもVariableをCRUDできます。 Method Endpoint Overview GET /variables List variables.The collection does not contain data. To get data, you must get a single entity. POST /variables Create a variable. GET /variables/{variable_key} Get a variable by key. PATCH /variables/{variable_key} Update a variable by key. DELETE /variables/{variable_key} Delete a variable by key. まとめ RESTfulAPIが用意されているということは、内部のオブジェクトをCRUD出来るということなのだろう、 という推測のもと、Airflow APIのAPI仕様書を読んで感想を書いてみました。 Airflowの概念と対応するリソースはAPIに出現していて、End-To-End Pipeline Testを書く際に、Assert、実行制御を記述できそうな気持ちになりました。 Assert、実行制御、だけなら、こんなに要らない気もします。 API呼び出し自体の煩雑さがあり、Testの記述量が増えてしまうかもしれません。 以下の記事のようにwrapperを書く必要があるかもしれません。 https://github.com/chandulal/airflow-testing/blob/master/src/integrationtest/python/airflow_api.py DAGの入力側/出力側Endに対するファイル入出力は別で解決が必要そうです。 「API仕様書を読んでみた」の次の記事が書けるときになったら、再度まとめ記事を書いてみようと思います。
CustomOperatorのUnitTestを理解するためGCSToBigQueryOperatorのUnitTestを読んでみた話
未知の連携先との入出力を行う際、CustomOperatorを作るという解決策があります。 CustomOperatorを自作した場合、そのテストをどう書くか、という問題が発生します。 ビルトインのGCSToBigQueryOperatorがどうテストされているかを読むと、雰囲気がわかりました。 UnitTestコードを読んで見ましたので、本記事で感想を書いてみます。 https://github.com/apache/airflow/blob/main/tests/providers/google/cloud/transfers/test_gcs_to_bigquery.py 前提となる知識 Airflowのhookについて理解する必要がありました。 フワッとしていますが、コードを読んで使われ方をながめているとイメージが湧いてきます。 hook しばしば外部からデータを入力したり外部へデータを出力する必要が出てくる。 外部と接続する際にcredentialsを保管し使用する必要があるが、 Airflowはconnectionという概念のオブジェクトを用意している。 connection は conn_id により識別される。Airflow UIやCLIから管理できる。 connectionを直接操作するようなlow-levelコードを書くこともできるが、 煩雑にならないよう、外部リソース毎にhookというhigh-levelインターフェースが用意されている。 Connections & Hooks pythonのunittestも理解する必要がありました。 unittestのmockについて以下が参考になりました。 [clink implicit=\"false\" url=\"https://qiita.com/satamame/items/1c56e7ff3fc7b2986003\" imgurl=\"https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcQqktBhX0kv-C4zk1lu0D8T0ExDUFQdNdu9dQ&s\" title=\"Python の unittest の mock\" excerpt=\"Python の unittest を使っていて、mock が何をするものかは分かっているけど、まだちょっと得体が知れない、怖い、という段階があると思います。この段階を克服するために、何も知らない状態から徐々に mock を理解するためのステップを作りたいと思いました。対象は Python 3.x です。\"] UnitTestを読んでいく TestGCSToBigQueryOperatorというクラスにUnitTestメソッドの実装例が書かれています。 python built-inのテストパッケージであるunittestが使用されています。 @mock.patchデコレータを使用しBigQueryHookをpatchしています。 BigQueryHookのmockインスタンスがhookとして渡ります。 hookのreturn_value, side_effectを差し替えてGCSToBigQueryOperatorインスタンスを実行します。 insert_job(),generate_job_id(),split_table_name(),get_job()の差し替えを行なっています。 メソッドの階層をドット(.)で繋いでより深い場所を差し替えられる様子です。 unittestを書いた人はコードが何に依存しているか分かるので、知識に基づいて依存しているものをmockします。 import json from unittest import mock from unittest.mock import MagicMock, call TASK_ID = \"test-gcs-to-bq-operator\" TEST_EXPLICIT_DEST = \"test-project.dataset.table\" WRITE_DISPOSITION = \"WRITE_TRUNCATE\" SCHEMA_FIELDS = [ {\"name\": \"id\", \"type\": \"STRING\", \"mode\": \"NULLABLE\"}, {\"name\": \"name\", \"type\": \"STRING\", \"mode\": \"NULLABLE\"}, ] MAX_ID_KEY = \"id\" JOB_PROJECT_ID = \"job-project-id\" TEST_BUCKET = \"test-bucket\" TEST_SOURCE_OBJECTS = \"test/objects/test.csv\" DATASET = \"dataset\" TABLE = \"table\" GCS_TO_BQ_PATH = \"airflow.providers.google.cloud.transfers.gcs_to_bigquery.{}\" job_id = \"123456\" hash_ = \"hash\" REAL_JOB_ID = f\"{job_id}_{hash_}\" class TestGCSToBigQueryOperator: @mock.patch(GCS_TO_BQ_PATH.format(\"BigQueryHook\")) def test_max_value_external_table_should_execute_successfully(self, hook): hook.return_value.insert_job.side_effect = [ MagicMock(job_id=REAL_JOB_ID, error_result=False), REAL_JOB_ID, ] hook.return_value.generate_job_id.return_value = REAL_JOB_ID hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) hook.return_value.get_job.return_value.result.return_value = (\"1\",) operator = GCSToBigQueryOperator( task_id=TASK_ID, bucket=TEST_BUCKET, source_objects=TEST_SOURCE_OBJECTS, destination_project_dataset_table=TEST_EXPLICIT_DEST, write_disposition=WRITE_DISPOSITION, schema_fields=SCHEMA_FIELDS, max_id_key=MAX_ID_KEY, external_table=True, project_id=JOB_PROJECT_ID, ) \"基づく知識\"は第三者には理解不能ですが、GCSToBigQueryOperator.pyを読むと理由がわかります。 GCSToBigQueryOperatorのexecute(self, context:Context)を読むと、 先頭でBigQueryHookのインスタンスを取得し、BaseOperator由来のself.hookに設定しているようです。 generate_job_id()により、job_idを取得しています。 _use_existing_table()内で、split_table_name()により,ProjectID,Dataset,Tableを取得しています。 mockしたjob_idが既に存在している場合、get_job()で既存を取得しています。 def execute(self, context: Context): hook = BigQueryHook( gcp_conn_id=self.gcp_conn_id, location=self.location, impersonation_chain=self.impersonation_chain, ) self.hook = hook self.source_format = self.source_format.upper() job_id = self.hook.generate_job_id( job_id=self.job_id, dag_id=self.dag_id, task_id=self.task_id, logical_date=context[\"logical_date\"], configuration=self.configuration, force_rerun=self.force_rerun, ) さて、Assertは以下のように書かれています。 GCSToBigQueryOperatorは、Source(GCS)から.csv等を読み込みDest(BigQuery)へ配置するものです。 Destの然るべき場所にテーブルが作られ、値が入ります。 execute()すると、max_id_keyで指定したカラムの最大値が戻るようです。 \"test-bucket\"に配置した\"test/objects/test.csv\"は\"id\",\"name\"の2列からなるCSVで、 例えば\"id\"=\"1\", \"name\"=\"hoge\"ならば、\"id\"列の最大値である1が戻るため、1をassertすればOKです。 result = operator.execute(context=MagicMock()) assert result == \"1\" これだと、分岐をだいぶすっ飛ばしているので、だいぶ薄いカバレッジになるかと思います。 まとめ GCSToBigQueryOperatorのUnitTestを読んでみました。分かってしまうと普通のUnitTestでした。 Source to Destのパターンはだいたい似たようになるのかも、なので、 作るUnitTestも似たような感じになるのかもしれません。
GoogleによるAirflow DAG実装のベスプラ集を読んでみた – その1
GoogleによるフルマネージドAirflow環境であるCloud Composerを使う必要があり、 いそぎでAirflow+Cloud Composerをキャッチアップすることになりました。 Googleが公開するベスプラ集があることを知り、読んでみることにしました。 Cloud Composerと題されていいますが、ほぼAirflowと読み替えて良いのかなと思います。 書かれているのは、少し基本的なシナリオだと思います。 経験に裏付けられたゴリゴリの集合知、というものはタダでは手に入らないのだろうと思います。 スタート地点に立つ際の道しるべ、ぐらいの気持ちです。 おそらく一緒に使うシナリオが多いであろう、データ変換ツールのdbtと競合するものがあります。 大構造としてはAirflow DAGの下にdbt DAGが来るため、Airflow DAGのベスプラを実現する前提で dbt DAGを書いていくものと考えていました。これだけだとバッティングすると思います。 ウェアハウスとは切り離されています。特にBigQueryを前提にするならもう少し踏み込んだ内容と なるはずだと思いますが、ちょっと書かれていないようです。 いったん半分くらい読んでみたので読書感想文を書いてみました。 [clink implicit=\"false\" url=\"https://cloud.google.com/blog/ja/products/data-analytics/optimize-cloud-composer-via-better-airflow-dags\" imgurl=\"https://www.gstatic.com/pantheon/images/welcome/supercloud.svg\" title=\"Airflow DAG の改良による Cloud Composer の最適化\" excerpt=\"このガイドには、Apache Airflow DAG を作成する際に一般的に適用できる、作業項目のチェックリストが掲載されています。これらのチェック項目は、Google Cloud とオープンソース コミュニティが判断したベスト プラクティスに沿ったものとなっています。一連の高パフォーマンスの DAG によって Cloud Composer の動作が最適化され、標準化された作成方法によって、デベロッパーが数百個、あるいは数千個の DAG でも管理できるようになります。チェック項目のそれぞれが、Cloud Composer 環境と開発プロセスにメリットをもたらします。\"] [arst_toc tag=\"h4\"] はじめに ファイル名を標準化します Workflowの特徴を個別に表現するだけでは不十分で、ファイル名が含む部分文字列がサブ機能や属性のインデックスとなっていて欲しい。さらに、ファイル名から機能概要を逆引き推測できたら便利。 作成した DAG ファイルのコレクションを他のデベロッパーが容易に参照できるようにするためです。例: team_project_workflow_version.py DAG は決定的でなければなりません 入力となるデータが同じであれば、出力は常に同じであるべき、という観点だと思う。 例えば、入力となるデータは同じであっても、実行時間に依存して処理を行ってしまうと、 出力が時間依存となる。テストが無茶苦茶大変になるだろうなと思う。 Airflow DAG単体であれば、そう理解に難しくないポイントだとは思う。 しかし、dbt DAGを含めると一気に縛りがキツくなると思う。 特定の入力によって常に同じ出力が生成される必要があります DAG はべき等でなければなりません 大雑把に書けば、ある操作を1回行っても数回行っても結果が同じであることを言う。 これを実現する仕組みの選択は結構悩ましく、「追加する範囲をいったん削除追加する」が簡単。 しかし、この方法だと無駄なスキャン量が発生する。 dbtを使用する場合、incremental modelがべき等性担保の手段として挙げられることが多いが、 実際にべき等性を担保するには考慮しないといけないことがある。 こちらの記事(dbtで「Incremental」を使わずに冪等性を担保する方法について)が詳しい。 DAG を何度トリガーしても、毎回同じ効果 / 結果が得られなければなりません 例えば、以下のように書けば、入力テーブルが変わらない限りべき等となる。 これを行うには、入力テーブルに「ロード日時」といったメタデータが必要となる。 {{ config( materialized=\"incremental\" ) }} ・・・ {%- if is_incremental() %} WHERE ORDERDATE = TO_DATE(\'{{ var(\'load_date\') }}\') {%- endif %} タスクはアトミック、かつ、べき等でなければなりません ちょっと何言っているかわからない書きっぷり。データベースのACID特性のAtomic性を意識する。 ある操作が一連の処理の完了によって達成される場合、部分的に処理の一部が成功する、という 状態になってしまってはいけない。全ての処理が成功したか、全ての処理が失敗したか、のどちらか、 になっていないといけない。 タスクごとに、他のオペレーションとは独立して再実行できる 1つのオペレーションを処理するようにします。タスクがアトミックな場合、そのタスクの一部が成功したことは、タスク全体が成功したことを意味します。 可能な限り単純な DAG にします ちょっと一般的すぎて良くわからない。 「スケジューリングのコスト」って、「実行のコスト」よりもだいぶ小さいんじゃなかろうか、と思うが、 それでも意識しないといけないのだろうか。 ネストされたツリー構造は、そもそも理解しづらくて避けるべきだろう、とは思う。 タスク間の依存関係が少ない単純な DAG にすると、オーバーヘッドが少なくなるため、スケジューリングのパフォーマンスが向上する傾向があります。一般的に、多数の依存関係がある深くネストされたツリー構造よりも線形構造(例: A->B->C)にしたほうが、効率的な DAG になります。 Python の docstring 規則に従って、各関数のファイルの上部にドキュメントを記述してください AirflowはPythonで書けることが最大の特徴なので、そのメリットを発揮するため、docstringでコメント書けよと。 Python の docstring 規則は、他のデベロッパーやプラットフォーム エンジニアが Airflow DAG を理解するために役立ちます。 関数と同様に BashOperator のドキュメントも作成するようにしてください。DAG で bash スクリプトを参照している場合、スクリプトの目的を記したドキュメントがないと、このスクリプトに詳しくないデベロッパーにはトラブルシューティングが困難です。 DAG の作成を標準化する default_args にオーナーを追加します。 タスクを得る(Operatorをインスタンス化する)際に、各Operatorのコンストラクタに引数を与えるが、 複数のOperatorに渡す引数を共通化したい場合には、default_argsをDAG()に与える。 こうすると、Operatorにdefault_argsで設定した引数を与えたことになる。 各Operatorに引数を与えると、defautl_argsをオーバーライドする動作となる。 過去の公式でdefault_argsは、task_id と owner が mandatory(必須) であるとされている。 これについて、Why is \'owner\' a mandatory argument for tasks and dags? という記事がある。 それに続くPRは More detail on mandatory task arguments であり、mandatoryの根拠を聞いている。 歴史的な理由による、で片付いているな。ベスプラではownerに実装者のメアドなどを書けという。 実装担当者を明らかにせよ、という話であればcomitterを見れば良いだけで、ちょっと意味不明。 mandatoryなので、何か入れないといけないなら、とりあえず実装者のメアドを入れておけ、ということか。 import pendulum with DAG( dag_id=\'my_dag\', start_date=pendulum.datetime(2016, 1, 1, tz=\"UTC\"), schedule_interval=\'@daily\', catchup=False, default_args={\'owner\': \'hoge@ikuty.com\'}, ) as dag: op = BashOperator(task_id=\'dummy\', bash_command=\'Hello World!\') print(op.retries) # 2 dag = DAG() ではなく with DAG() as dag を使用します。 Pythonのwith文の仕様。コンテキストマネージャという言う。 try... except... finally をラップするため、リソースの確保と対応する解放が必ず行われる。 with DAG(...):文の下のインデントの中では、各Operatorのコンストラクタにdagインスタンスを 渡さなくてよくなる。 すべてのオペレーターまたはタスクグループに DAG オブジェクトを渡す必要がなくなるようにします。 DAG ID 内にバージョンを設定します。 以下とのこと。あぁ..バージョニングが実装されていないので手動でバージョニングを行うべし、と。 AirflowはDAGをファイルIDで管理しているため、ファイルIDを変更するとUI上、別のDAGとして扱われるよう。 積極的にDAG IDを変更して、Airflow UIに無駄な情報を出さないようにする、というアイデア。 DAG 内のコードを変更するたびにバージョンを更新します。 こうすると、削除されたタスクログが UI に表示されなくなることや、ステータスのないタスクが古い DAG 実行に対して生成されること、DAG 変更時の一般的な混乱を防ぐことができます。 Airflow オープンソースには、将来的にバージョニングが実装される予定です。 DAG にタグを追加します。 公式はこちら。Add tags to DAGs and use it for filtering in the UI 単にUI上の整理のために留まらず、処理の記述に積極的に使うことができる様子。 これを無秩序に使うと扱いづらくなりそうなので、使う場合は用途を明確にしてから使うべきかと思う。 1. デベロッパーがタグをフィルタして Airflow UI をナビゲートできるようにします。 2. 組織、チーム、プロジェクト、アプリケーションなどを基準に DAG をグループ化します。 DAG の説明を追加します。 唐突で非常に当たり前なのだが、あえて宣言することが大事なんだろうと思う。 他のデベロッパーが自分の DAG の内容を理解できるようにします。 作成時には DAG を一時停止します。 なるほど。 こうすると、誤って DAG が実行されて Cloud Composer 環境の負荷が増すという事態を回避できます。 catchup=Falseに設定して自動キャッチアップによるCloud Composer環境の過負荷を避けます。 まず、catchupの前に、Airflowの実行タイミングが直感的でなさそう。 こちらの記事がとても参考になった。【Airflow】DAG実行タイミングを改めて纏めてみた DAGの実行タイミングはstart_dateとschedule_intervalを利用して計算される。 重要なポイントはschedule_intervalの終了時にDAGが実行される、という点。 また、schedule_intervalはウインドウ枠を表している。 例えば 0 11 * * * であれば、毎日11:00-翌日11:00という時間の幅を表す。 start_date=7月15日、schedule_interval=0 11 * * * のとき、 7月15日 11:00から 7月16日11:00までの期間が終わった後、DAGが開始される。 DAGをデプロイする際、デプロイ日時よりも古いstart_dateを設定することができる。 このとき、start_dateからデプロイ日時までの間で、本来完了しているはずだが実行していない schedule_intervalについてDAGを実行する機能がcatchup。 catchup=Trueとすると、これらのschedule_intervalが全て再実行の対象となる。 一方、catchup=Falseとsるうと、これらのうち、最後だけが再実行の対象となる。 (Falseとしても、最後の1回は再実行される) 過去のデータを自動投入するとか、危ないので、確認しながら手動実行すべきだと思う。 もし本当にcatchupするのであれば、計画的にFalseからTrueにすべきだろうし、 その時は負荷を許容できる状況としないといけない。 DAGが完了せずにCloud Composer環境のリソースが保持されることや、再試行時に競合が引き起こされることのないよう、 dagrun_timeout を設定します DAG、タスク、それぞれにタイムアウトプロパティが存在する。それぞれ理解する必要がある。 DAGタイムアウトはdagrun_timeout、タスクタイムアウトはexecution_timeout。 以下が検証コード。job1のexecution_timeout引数をコメントアウトしている。 コメントアウトした状態では、dagrun_timeoutがDAGのタイムアウト時間となる。 検証コードにおいては、タイムアウト時間が15秒のところ、タスクで20秒かかるのでタイムアウトが起きる。 execution_timeout引数のコメントアウトを外すと、DAGのタイムアウト時間が タスクのタイムアウト時間で上書きされ30秒となる。 タスクで20秒かかってもタイムアウトとならない。 from datetime import timedelta from airflow.utils.dates import days_ago from airflow import DAG from airflow.operators.python import PythonOperator def wait(**context): time.sleep(20) defalut_args = { \"start_date\": days_ago(2), \"provide_context\": True } with DAG( default_args=defalut_args, dagrun_timeout=timedelta(seconds=15), ) as dag: job1 = PythonOperator( task_id=\'wait_task1\', python_callable=wait, # execution_timeout=timedelta(second=30) ) ベスプラの言うところは、ちゃんとタイムアウトを設定しろよ、ということだと思う。 インスタンス化で DAG に引数を渡し、すべてのタスクにデフォルトで同じ start_date が設定されるようにします Airflowでは、Operatorのコンストラクタにstart_dateを与えられるようになっている。 同一DAGに所属するタスクが異なるstart_dateを持つ、という管理が大変なDAGを作ることも出来てしまう。 基本的には、DAGにstart_dateを渡して、タスクのデフォルトを揃えるべき、だそう。 DAG では静的な start_date を使用します。 これがベスプラになっているのはかなり助かる。 動的な start_date を使用した場合、誤った開始日が導き出され、失敗したタスク インスタンスやスキップされた DAG 実行を消去するときにエラーが発生する可能性があります。 retries を、DAG レベルで適用される default_args として設定します。 retriesについても、start_dateと同様にDAGレベルで default_args として設定するそう。 なお、タスクのリトライに関する設定には以下のようなものがある。 retries (int) retry_delay (datetime.timedelta) retry_exponential_backoff (bool) max_retry_delay (datetime.timedelta) on_retry_callback (callable) retries (int)は、タスクが\"失敗\"となる前に実行されるリトライ回数。 retry_delay (datetime.timedelta)はリトライ時の遅延時間。 retry_exponential_backoff (bool)はリトライ遅延での指数関数的後退アルゴリズムによるリトライ間隔の待ち時間を増加させるかどうか max_retry_delay (datetime.timedelta)はリトライ間の最大遅延間隔 on_retry_callback (callable)はリトライ時のコールバック関数 適切な再試行回数は 1~4 回です。再試行回数が多すぎると、Cloud Composer 環境に不要な負荷がかかります。 具体的に retries を何に設定すべきか、について書かれている。 ここまでのまとめ ここまでのステートメントがコードになっている。 わかりやすい。 import airflow from airflow import DAG from airflow.operators.bash_operator import BashOperator # default_args 辞書を定義して、DAG のデフォルト パラメータ(開始日や頻度など)を指定する default_args = { \'owner\': \'me\', \'retries\': 2, # 最大再試行回数は 2~4 回にすること \'retry_delay\': timedelta(minutes=5) } # `with` ステートメントを使用して DAG オブジェクトを定義し、一意の DAG ID と default_args 辞書を指定する with DAG( \'dag_id_v1_0_0\', # ID にバージョンを含める default_args=default_args, description=\'This is a detailed description of the DAG\', # 詳しい説明 start_date=datetime(2022, 1, 1), # 静的な開始日 dagrun_timeout=timedelta(minutes=10), # この DAG に固有のタイムアウト is_paused_upon_creation= True, catchup= False, tags=[\'example\', \'versioned_dag_id\'], # この DAG に固有のタグ schedule_interval=None, ) as dag: # BashOperator を使用してタスクを定義する task = BashOperator( task_id=\'bash_task\', bash_command=\'echo \"Hello World\"\' )