EIP / Polling Consumer


Polling Consumer

一言要約

受け手の準備が整ってから処理を進めたいときには、ポーリングを使えばいい。

Context

アプリケーションで、Messageを受け取る必要がある。 そのときに、メッセージを受け取るタイミングをアプリケーション側でコントロールしたい。

Problem

アプリケーション側での準備が整ってからメッセージを受け取れるようにするには?

Force

メッセージコンシューマーの存在目的はたったひとつ。メッセージを受け取ることだ。 ただ、新しいメッセージがあるってことを、コンシューマーはどうやって知るのだろう? いちばん簡単なのは、コンシューマーが定期的にチャネルをチェックして、メッセージが届いているかどうかを調べるという方法だ。 これをポーリングという。


Solution

そのアプリケーションでPolling Consumerを使う。 これは、メッセージを受信したいときに明示的に受信呼び出しをするというものだ。

このパターンはSynchronous Receiverと呼ばれることもある。今回Polling Consumerという名前を採用した理由は、 受信者がメッセージをポーリングしてはそれを処理し、そして次のメッセージをポーリングする、という流れを意識したからだ。

便宜上、メッセージングAPIは二種類の受信メソッドを用意していることが多い。 メッセージが存在しなかった場合の処理が違っていて、一方はメッセージが届くまで処理をブロックするけれどももう一方はただちに呼び出し元に制御を戻す。

Polling Consumerは、アプリケーションがメッセージを明示的にリクエストして受信するときに使うオブジェクト。 アプリケーション側でメッセージを処理する準備が整ったら、コンシューマーをポーリングし、 コンシューマーがメッセージングシステムからメッセージを取得してそれを戻す(次のシーケンス図を参照)。

ただし、コンシューマーがメッセージングシステムからメッセージを取得する方法は実装依存であり、必ずしもポーリングしているとは限らない。

Polling Consumerを使えば、メッセージを同時にどの程度処理するのかをアプリケーション側で制御できるようになる。アプリケーション側の能力に合わせて、ポーリングするスレッドの数を絞ればいい。 これで、アプリケーション側でメッセージをさばききれなくなる事態を回避できる。処理しきれないメッセージはキューに入り、後で処理することになる。

受信側のアプリケーションは、監視したいチャネルごとに少なくとも一本のスレッドを用意するのが一般的だ。 しかし、単一のスレッドに複数のチャネルを監視させることもある。めったにメッセージがやってこないチャネルを監視する場合などはそうするだろう。 単一のチャネルだけを監視するスレッドの場合は、メッセージが届くまでブロックするバージョンの受信メソッドを使う。 一方、複数のチャネルを監視するスレッドの場合は、メッセージがなければすぐに制御を戻すバージョンの受信メソッドを使う。

他のパターンとの関連

  • あまりにも頻繁にポーリングしたり、長時間スレッドをブロックしたりするのは非効率的。そんなときにはEvent-Driven Consumerを使うほうがいい。
  • 複数のPolling Consumerを、Competing Consumersとして使うことができる。
  • Message Dispatcherは、Polling Consumerとして実装できる。
  • Polling Consumerは、Selective Consumerにもなり得るし、Durable Subscriberにもなり得る。
  • Polling Consumerは、Transactional Clientにもなり得る。コンシューマーが、メッセージが実際にチャネルから削除されるタイミングを制御できる。

Examples

JMS Receive

JMSの場合、メッセージコンシューマーはMessageConsumer?.receiveでメッセージを同期的に処理する。

MessageConsumer?には三種類の受信メソッドがある。

  1. receive()―メッセージが取得できるようになるまでブロックし、メッセージを受け取ったら返す。
  2. receiveNoWait?()―メッセージが存在するかどうかを一度だけチェックし、受け取ったメッセージあるいはnullを返す。
  3. receive(long)―メッセージが取得できるようになるまでブロックし、メッセージを受け取ったら返す。ただし、タイムアウトが発生したらnullを返す。

たとえば、コンシューマーを作ってメッセージを受信するコードは、次のようにシンプルなものになる。

Destination dest = // ターゲットの取得
Session session = // セッションの作成
MessageConsumer consumer = session.createConsumer(dest);
Message message = consumer.receive();

.NET Receive

.NETの場合は、コンシューマーはMessageQueue.Receiveを使ってメッセージを同期的に処理する。

MessageQueueクライアントにはいくつかの受信方法がある。最もシンプルなものが次の二つだ。

  1. Receive()―メッセージが取得できるようになるまでブロックし、メッセージを受け取ったら返す。
  2. Receive(TimeSpan?)―メッセージが取得できるようになるまでブロックし、メッセージを受け取ったら返す。ただし、タイムアウトが発生したらMessageQueueException?を投げる。

既存のキューからメッセージを取得するコードは極めてシンプルだ。

MessageQueue queue = // キューの取得
Message message = queue.Receive();

担当者のつぶやき

みんなの突っ込み