Gaudiy Tech Blog

Gaudiyの技術、開発組織、カルチャーについてお伝えするブログです

ECチームでの分散トランザクションの課題とOutboxV2の利用について

はじめまして。ファンと共に時代を進める、Web3スタートアップのGaudiyでエンジニアをしている miyamoto です。

Gaudiyでは、ファンコミュニティサービスの「Gaudiy Fanlink」にマイクロサービスアーキテクチャを採用しています。

マイクロサービスアーキテクチャは、サービスの独立性を高め、チームが自律的に開発を進められる強力なパラダイムです。しかし、その分散的な性質から、サービス間のデータ一貫性を保つことには特有の難しさが伴います。

この分散トランザクションにおいて、Gaudiyでは主に Outbox(Transactional Outbox Pattern) を利用していますが、ECチームである課題にぶつかり、その解決策としてGaudiyが独自に開発・改良したOutboxV2という新しい仕組みを導入しました。

今回のブログでは、Outboxの仕組みから、ECチームで生じた問題とその解決策としてのOutboxV2の導入までを詳しくご紹介していきます。マイクロサービスアーキテクチャの一事例として、ご参考になれば嬉しいです。

1. Outboxの仕組み

Outboxは、マイクロサービスアーキテクチャで、データベースの更新とメッセージの送信を確実に両立させるためのデザインです。これにより、サービス間のデータ一貫性を保ちます。

(出典元: microservices.io)

1-1.アトミックな書き込み

サービスは、自身のビジネスデータ(例: Orderテーブル)の更新と、送信したいメッセージの内容を「Outboxテーブル」に書き込む処理を、単一のデータベーストランザクション内で行います。 これにより、データベースの更新と「メッセージを送る」という記録が必ずセットで行われ、どちらか一方が失敗することがなくなります。

1-2.メッセージリレー

メッセージリレーと呼ばれる独立したプロセスが、Outboxテーブルを監視します。Gaudiyでは、テーブルの監視をpollingで実現していました。未送信のメッセージを見つけると、それをメッセージブローカーへ送信し、送信が完了したらテーブルからそのレコードを削除するか、処理済みとしてマークします。

1-3. Outboxテーブル:メッセージの状態管理

// OutboxEventテーブル
CREATE TABLE OutboxEvent (
  OutboxEventId STRING(64) NOT NULL,          -- 一意なイベントID
  Topic STRING(255) NOT NULL,                 -- Pub/Subトピック名
  Data outbox.task.Event NOT NULL,            -- Protocol Buffersのメッセージデータ
  CreatedAt TIMESTAMP NOT NULL OPTIONS (      -- レコード作成時刻
    allow_commit_timestamp = true
  ),
  SentAt TIMESTAMP OPTIONS (                  -- メッセージ送信完了時刻(NULLなら未送信)
    allow_commit_timestamp = true
  ),
  -- FallbackPoller用のシャードID(OutboxEventIdのハッシュ値を8で割った余り)
  -- FallbackPollerについては後述する
  ShardId INT64 NOT NULL AS (MOD(ABS(FARM_FINGERPRINT(OutboxEventId)), 8)) STORED,
) PRIMARY KEY(OutboxEventId);

-- インデックス:未送信メッセージの効率的な検索用
CREATE INDEX OutboxEventBySentAt ON OutboxEvent(OutboxEventId, SentAt);
CREATE INDEX OutboxEventByShardIdSentAtCreatedAt ON OutboxEvent(ShardId, SentAt, CreatedAt);

2. ECサイトの裏側で起きていた「コインが買えていない?」問題

ECチームでは、決済システムや、デジタルグッズ・通常の商品を販売するストア機能の開発を担当しています。 このストアでは、クレジットカードなどの現金決済のほかに、独自の「コイン」でも支払いが可能です。このコインは現金で購入できます。

2-1. どんな問題が起きていたか?

ユーザーがコインを購入する際、複数のマイクロサービスをまたいで行われるため、データの整合性を保つためにOutboxを利用していました。

しかし、この方法では、データベース(Spanner)に「処理は完了したか?」と確認(polling)するために最大で5秒以上かかってしまうことがありました。5sというのは、Gaudiyでpollingするためにデータベース負荷的に許容できる最低間隔が5sだからです。

このタイムラグのせいで、ユーザーがコインを購入してもすぐには反映されず、「購入に失敗したのかな?」と勘違いして、もう一度コイン購入を試みるという問題が発生していました。

2-2. どのように解決したか?

この問題を解決するために、Gaudiyが独自に開発・改良した「OutboxV2」という新しい仕組みを導入しました。

このOutboxV2によって、コイン購入後のタイムラグを短縮し、ユーザーが「コインが買えていない」と誤解することなく、スムーズに買い物ができるようになりました。

次から、OutboxV2について詳しく説明していきます。

3. OutboxV2とは

(便宜上、この後は元々使われていたOutboxをOutboxV1と呼びます)

OutboxV2で最も重要な技術的要素は、Cloud SpannerのChange Streamsを活用したリアルタイムなデータ変更検知です。従来のpolling方式と比べて、劇的な処理時間短縮を実現しています。

3-1. Change Streamsとは

Cloud SpannerのChange Streamsは、データベース内のデータ変更(INSERT、UPDATE、DELETE)をリアルタイムに近い形で捉え、ストリームとして外部に提供する機能です。

OutboxV2では、OutboxEventテーブルへの新しいレコード挿入を即座に検知し、メッセージ処理を開始します。

ref: https://cloud.google.com/spanner/docs/change-streams?hl=ja

// Change Streamsの設定(SQL)
CREATE CHANGE STREAM OutboxEventStream FOR OutboxEvent 
OPTIONS ( 
  value_capture_type = 'OLD_AND_NEW_VALUES',  // 変更前後の値を取得
  retention_period = '7d',                    // 7日間のデータ保持
  exclude_update = true,                      // UPDATE操作は除外
  exclude_delete = true                       // DELETE操作は除外
);
  • exclude_update = true:OutboxEventのSentAt更新は検知する必要がないため除外
  • exclude_delete = true:OutboxEventレコードは通常削除せず、履歴として保持

Cloud Spannerのchange streamsをOutboxテーブルに設定し、データの変更をリアルタイムに近い形でメッセージリレーがsubscribeし、メッセージブローカーへメッセージをpublishします。こうすることで、OutboxV1では実現できなかった高速な分散トランザクションを実現することができました。

3-2. リアルタイム検知の仕組み

OutboxEventテーブルにレコードが挿入されると、Change Streamsが即座にイベントを発火します。このイベントをChangeStreamsWatcherが受信し、処理を開始します。

(以下のtype ChangeRecordについてはこちらを参照してください)

type ChangeStreamsWatcher struct {
    cs       *changestreams.ChangeStreams  // Change Streamsクライアント
    client   database.SpannerClient        // Spannerクライアント
    kch      channel.Channel               // イベントID送信用チャネル
    done     chan struct{}                 // 終了シグナル
}

type Result struct {
    PartitionToken string          `spanner:"-" json:"partition_token"`
    ChangeRecords  []*ChangeRecord `spanner:"ChangeRecord" json:"change_record"`
}

// Change Streamsのイベントを監視・処理
func (cs *ChangeStreamsWatcher) Watch(ctx context.Context, result *Result) error {
    select {
    case <-cs.done:
        cs.Stop()
        return nil
    default:
        // 各変更レコードを処理
        for _, cr := range result.ChangeRecords {
            if cr.HasDataChangeRecords() {  // データ変更イベントの場合
                for _, dcr := range cr.DataChangeRecords {
                    // 重要:分散ロックによる重複処理防止
                    locked, err := cs.Lock(ctx, dcr.ServerTransactionID)
                    if err != nil {
                        return err
                    }
                    if locked {
                        // 既に他のインスタンスが処理中の場合はスキップ
                        return nil
                    }

                    // 変更されたOutboxEventのIDを取得
                    for _, key := range dcr.Mods.GetValues("OutboxEventId") {
                        if eventID, ok := key.(string); ok {
                            // 処理用チャネルにイベントIDを送信(ここがリアルタイム!)
                            cs.kch.Input(eventID)
                        }
                    }
                }
            }
        }
    }
    return nil
}

3-3. 分散ロック機構による重複処理防止

マイクロサービス環境では、同じOutboxEventが複数のインスタンスで同時に処理される可能性があります。インスタンスごとに未処理のOutboxイベントを処理するsubscribeが実行されているので、同じレコードを処理しようとしてトランザクションの解放待ちにならないようにServerTransactionIDを使った分散ロック機構を実装しています。

SpannerのServerTransactionIDは、Cloud Spannerが生成するデータベースレベルで一意なトランザクション識別子です。トランザクションを一意に識別するため、これをロックキーとして使用することで確実に重複処理を防げます。

<OutboxLockテーブル:分散ロックを管理>

CREATE TABLE OutboxLock (
  TransactionId STRING(28) NOT NULL,          -- SpannerのServerTransactionID
  CreatedAt TIMESTAMP NOT NULL OPTIONS (      -- ロック取得時刻
    allow_commit_timestamp = true
  ),
) PRIMARY KEY(TransactionId),
-- 1日経過したロックレコードは自動削除(メモリリーク防止)
ROW DELETION POLICY (OLDER_THAN(CreatedAt, INTERVAL 1 DAY));
  • TransactionId(ServerTransactionID)をプライマリキーとすることで、同じトランザクションによる重複ロック取得を防止
  • ROW DELETION POLICYにより古いロックレコードが自動削除され、テーブルサイズが無制限に増加することを防ぐ
  • ロック期間は通常数秒〜数分程度だが、障害時の安全マージンとして1日間保持
// 分散ロック機構 - OutboxLockテーブルを使用
func (cs *ChangeStreamsWatcher) Lock(ctx context.Context, serverTxID string) (bool, error) {
    // OutboxLockテーブルにTransactionIDを挿入してロックを取得
    ms := []*spanner.Mutation{
        spanner.Insert("OutboxLock",
            []string{"TransactionId", "CreatedAt"},
            []any{serverTxID, spanner.CommitTimestamp},
        ),
    }
    
    _, err := cs.client.Apply(ctx, ms)
    if err != nil {
        // AlreadyExistsエラー = 既に他のインスタンスがロック取得済み
        if grpcerrors.IsAlreadyExists(err) {
            return true, nil  // ロック済みを示すフラグを返す
        }
        return false, fmt.Errorf("failed to acquire lock: %w", err)
    }
    
    // ロック取得成功
    return false, nil
}

3-4. 全体シーケンス

4. OutboxV2の導入結果と課題への対処

OutboxV1では、OutboxテーブルにレコードがINSERTされてからメッセージブローカーにメッセージをpublishするまでに、最長5000ms程度かかっていました。今回、OutboxV2を導入したことで、レコードがINSERTされてからメッセージがpublishするまでに200ms程度で済むようになり、分散トランザクションの大幅な時間短縮に繋げることができました。

一方で、OutboxV2は強力ですが、万能ではありません。Change Streamsはリアルタイム性に優れる一方で、イベントの伝達が保証されない(At-Most-Once)という特性があります。ネットワークの問題やごく稀なクラウド側の障害で、変更イベントをメッセージブローカーへ送信しそこなう可能性がゼロではありません。これにより、「Outboxテーブルへの書き込み」と「メッセージブローカーへのpublish」の間の整合性に課題が生じます。

そこで我々は、この課題を克服するために、Change Streamsのイベントを取りこぼした場合でも最終的な整合性を担保するリカバリー機構として、OutboxV1で利用していた定期的にOutboxテーブルを監視するポーリング処理(Fallback Poller)も併用しています。このハイブリッドなアプローチにより、リアルタイム性を享受しつつ、メッセージ伝達の最終的な到達保証(At-Least-Once)も実現しています。

5. まとめ

Gaudiyで新しく開発したマイクロサービスアーキテクチャについて紹介させていただきました。

今回は「コインの配布ができていない」という誤解をユーザーに与えないように、リアルタイムに近い処理ができるOutboxV2を採用しました。OutboxV1に比べて分散トランザクションが高速になる一方で、アトミック性を確保できないという課題がありましたが、リカバリー機構を導入することでアトミック性を担保するようにしました。分散トランザクションを扱う上で、どうしても高速に処理したいというシーンはさまざまなプロダクトで発生するかと思うので、ぜひ参考にしてみてください。

GaudiyのECチームではこれから、決済基盤の追加開発や海外展開へ向けてのストア開発などやることが山積みの状態です。特に、巨大なIPを抱えての海外展開はこれからという段階で、ハードな開発がまさに始まろうとしています。

決済、ストア、物流など考えることが山積みでたくさんの困難に挑戦できる機会があるので、興味がある方はぜひお話しましょう!

site.gaudiy.com

site.gaudiy.com

Appendix

Outboxについて microservices.io

以下のメルカリさんの記事がわかりやすいです。 engineering.mercari.com