変更プロセッサでCosmosDBの変更フィードを読み取る

Azure

前回は、Azure FunctionsのCosmosDBトリガーを利用してCosmosDBの変更フィードを読み取りました。CosmosDBトリガーを利用すると、具体的な読み取り処理はAzure側に任せることができるので非常に簡単に読み取ることができます。

今回は、変更プロセッサを作成して読み取ってみたいと思います。

環境

言語.NET 6(6.0.413)
CosmosDBServerless
OSWindows 11

変更フィード読み取りに必要なコンポーネント

CosmosDBの変更フィードを変更プロセッサで読み取るには、以下の4つのコンポーネントが必要になります。

  • 監視対象コンテナ:変更フィードの生成元となるコンテナであり、実際にアイテムの追加更新を行う
  • リースコンテナ:変更フィードの読み取り状態を管理するコンテナ
  • コンピューティングインスタンス:変更プロセッサをホストするインスタンス。VMやApp Serviceなど
  • デリゲート:読み取った変更フィードをもとに、具体的な処理を行うコード

今回は監視対象コンテナとして「container3」を使用します。idとnameプロパティを持ったアイテムを「container3」に追加・更新します。リースコンテナは「leaseProcessor」という名前で予め作成しておきます。リースコンテナには誰がどこまで読み取ったかの情報が格納されます。

コンピューティングインスタンスとしてはVMを使用します。デリゲートでは今回は単純に読み取った結果をコンソールに出力するだけの処理を実装します。

変更プロセッサの実装

基本的には以下のサンプル通りに実装を進めていきますので、必要に応じて適宜ご確認ください。

まずは変更プロセッサのコードになります。6行目の「GetChangeFeedProcessorBuilder」で変更フィードを読み取ります。idとnameプロパティを持つクラスを型指定しています。1番目の引数でプロセッサの名前を自由に指定します。2番目には変更があった際に処理を実行する(デリゲート)関数を指定します。

続けて変更プロセッサをホストするインスタンス名と読み取り状態を管理するリースコンテナ名を指定します。

    public static async Task RunChangeFeed(string databaseId, CosmosClient client)
    {

        var leaseContainer = client.GetContainer(databaseId, Program.leaseContainer);
        var monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
        var changeFeedProcessor = monitoredContainer.GetChangeFeedProcessorBuilder<CosmosItem>("changeFeed", HandleChangesAsync)
            .WithInstanceName("instanceName")
            .WithLeaseContainer(leaseContainer)
            .Build();

        Console.WriteLine("Start");
        await changeFeedProcessor.StartAsync();
        Console.WriteLine("Change Feed Processor started");

        Console.WriteLine("Press any key to stop Change Feed Processor...");
        Console.ReadKey();
        await changeFeedProcessor.StopAsync();

    }

12行目でプロセッサを開始します。そしていったんReadkeyで処理を止めておいて、この間にcontainer3に対して変更を加えます。すると「HandleChangesAsync」が実行されます。では続いてその「HandleChangesAsync」を見てみます。

    static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<CosmosItem> changes, CancellationToken cancellationToken)
    {
        Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
        Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
        Console.WriteLine($"SessionToken ${context.Headers.Session}");

        foreach (var item in changes)
        {
            Console.WriteLine($"ID:{item.Id} Name:{item.Name}");
            await Task.Delay(1);
        }
    }

ハンドラーの引数は上記の3つで決まっています。第一引数のcontextには読み取りに関する様々な情報が設定されています。上記のコードではトークンやRUを出力しています。変更フィードの読み取りもCosmosDBのRUが消費されます。最後に9行目で変更されたアイテムのidとnameプロパティをコンソールに出力しています。

では、container3に格納されているアイテムのnameをtestからtest1に更新してみます。するとアプリが変更フィードを読み取りデリゲートが実行されて変更された内容や消費RUなどがコンソールに出力されました。無事問題なく読み取れているようです。

ライフサイクル通知

リースの取得、リースのリリース、例外発生時の3つのイベントを受信してカスタム処理を実装できます。下記の例ではリースの取得とリリース時にコンソールにログを出力する処理を記載しています。

リースを取得すると変更フィードを読み取る準備ができます。変更フィードを読み取る必要がなくなった際やエラー時などはリースをリリースします。

        // リースの取得時に実行する処理
        Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
        {
            var t = DateTime.UtcNow.ToString("HH:mm:ss");
            Console.WriteLine($"Lease {leaseToken} is acquired at {t} and will start processing");
            return Task.CompletedTask;
        };

    // リースのリリース時に実行する処理
        Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
        {
            var t = DateTime.UtcNow.ToString("HH:mm:ss");
            Console.WriteLine($"Lease {leaseToken} is released at {t} and processing is stopped");
            return Task.CompletedTask;
        };

        var changeFeedProcessor = monitoredContainer.GetChangeFeedProcessorBuilder<CosmosItem>("changeFeed", HandleChangesAsync)
            // 上記のデリゲートを登録
            .WithLeaseAcquireNotification(onLeaseAcquiredAsync)
            .WithLeaseReleaseNotification(onLeaseReleaseAsync)
            .WithInstanceName(instanceName)
            .WithLeaseContainer(leaseContainer)
            .Build();

動作確認は次のエラー処理の時に確認します。

エラー処理

デリゲート(HandleChangesAsyn)でハンドルされない例外が発生すると、実行中のスレッドを停止して新たにスレッドを実行します。また新たなスレッド実行後に失敗した処理を再度実行してくれます。それでは1/2の確率で例外を発生させるように変更して動作を見てみましょう。

    static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<CosmosItem> changes, CancellationToken cancellationToken)
    {
        (略)
        foreach (var item in changes)
        {
            Console.WriteLine($"ID:{item.Id} Name:{item.Name}");
        }
        // 1/2で失敗させる
        var r = new Random();
        if (r.Next(0, 2) == 1)
        {
            Console.WriteLine("Error");
            throw new Exception();
        }
    }

実行してみた結果が以下の画像になります。2回連続で失敗してますが3回目で成功しています。また失敗するたびにリースをリリース、取得していることが分かります。またこの間隔は13秒となっていますが、Azureドキュメントでも13秒ごとにリースを更新する旨の記載があります。

またライフサイクル通知で設定したリースの取得・リリースに関するログも問題なく出力されています。

リースの再取得

変更プロセッサは起動時にリースを取得します。このリースは物理パーティションの数だけあるようです。現在試している環境では物理パーティションは1つしかありませんので、リースも1つだけとなります。そのため複数のホストを同時に起動したときに、2つ目のホストはリースを獲得できず待ち状態になります。1つ目のホストが何らかの理由でリースをリリースすると、2つ目のホストがリースを取得できます。

実際にその様子を見てみましょう。インスタンス名を引数で受け取るようにコードを変更しています。first→secondの順に実行すると、firstが先にリースを取得します。secondではリース取得のログは出力されていません。待ち状態であることが分かります。

現時点でリースコンテナを見てみると、Ownerがfirstになっていることが分かります。

その後firstを停止すると、その3秒後にsecondがリースを取得しました。Azureドキュメントでは17秒ごとにリースの取得処理が実行されるようです。

また、リースコンテナのOwnerがsecondに更新されています。

まとめ

今回は変更フィードの読み取りを変更プロセッサを用いて行いました。Azure FunctionsのCosmosDBトリガーと違い、自分で実装しないといけない箇所が多く複雑ではあります、しかし、エラー処理やライフサイクル通知などで細かい設定ができますので、複雑な分カスタマイズ性は高くなっています。

今回も前回と同様に、「最新バージョンの変更フィードモード」で行いました。本当はCosmosDBのアイテムの削除も処理できる「すべてのバージョンと削除の変更フィード モード」を試したかったのですが、.NETではまだサポートされていませんでした。GAしたらまた試してみたいと思います。

コメント