EIP / Smart Proxy


EIP

Smart Proxy(スマートプロキシ)

一言要約

  • Smart Proxyを用いれば、現行のサービスへの改変を行うことなく、メッセージチャネル上のメッセージを追跡することができる

詳細

ワイヤタップ(Wire Taps)のペアは、コンポーネントを通して流れるメッセージを追跡するために使用することができる。しかしこのアプローチは、コンポーネントが固定された出力チャネルにメッセージをパブリッシュすることを想定している。しかし、多くのサービススタイルコンポーネントは要求メッセージに含まれるリターンアドレスで指定されたチャネルに応答メッセージをパブリッシュします。

どのようにして、要求で指定されたリターンアドレスに応答メッセージをパブリッシュするサービス上のメッセージを追跡することができるだろうか?

サービスを流れるメッセージを追跡するために、要求と応答の両方のメッセージをキャプチャする必要があります。ワイヤータップを使用して要求メッセージを傍受することは非常に簡単です。しかし、サービスはリクエスタの望むリターンアドレスに基づいて、異なるチャネルに応答メッセージをパブリッシュするので、応答メッセージを傍受することは、困難な部分である。

リクエスタが応答メッセージを送信するチャネルを指定することができるので、リターンアドレスのサポートは、要求 - 応答サービスに最も必要とされる。各リクエスタが正しい応答メッセージを抽出するために、固定チャネルに応答メッセージを投稿するようにサービスを変更させることは難しい。一部のメッセージングシステムは、単一の応答キュー内の特定のメッセージのためにコンシューマに「PEEK」を可能にする。しかし、そのアプローチは、特定の実装であり、​​応答メッセージが要求元に戻らない場合には動作しません。

ワイヤータップで説明したように、メッセージを検査するためにコンポーネントを変更することは、必ずしも実現可能でなかったり現実的ではありません。もしカスタムアプリケーションを扱っている場合、アプリケーションのコードを変更できなかったり、アプリケーションの外部にソリューションを実装する必要する必要がある場合があります。とりわけロジックの本質は、テスト·モードまたはプロダクションモードで動作するかどうかによって異なる場合があるため、インスペクションロジックを実装するために、各アプリケーションを必要としたくない場合があります。独立した自己完結型のコンポーネントでインスペクション機能を維持することは、柔軟性、再利用性およびテスト容易性を向上させます。

オリジナルのリクエスタによって供給されるリターンアドレスを格納し、スマートプロキシのアドレスと置き換えるスマートプロキシを使用してください。サービスが本来のリターンアドレスを経由する応答メッセージを送信する時。

図(P490)

スマートプロキシは、要求 - 応答サービスへの要求チャネルに送信されるメッセージをインターセプトします。各着信メッセージのために、スマートプロキシはオリジナルの送信者によって指定されたリターンアドレスを格納します。その後、スマートプロキシがリッスンしている応答チャネルに、メッセージ内のリターンアドレスが置き換えられます。応答メッセージがそのチャンネルに入ってきた時、スマートプロキシが格納しているリターンアドレスを取得し、チャネルへの変更されていない応答アドレスを転送するためにメッセージルータを使用します。

スマートプロキシは、外部サービスがリターンアドレスをサポートしておらず、固定の応答チャネルに応答する場合にも便利です。リターンアドレスのサポートを提供するスマートプロキシによって、そのようなサービスをプロキシすることができます。この場合、スマートプロキシは、分析機能を実行せず、単に正しいチャネルに応答メッセージを転送します。

スマートプロキシはリターンアドレスと、着信応答メッセージを相関させ、適切なチャネルに応答メッセージを転送することができるように、元のリクエスタによって提供されるリターンアドレスを格納する必要があります。スマートプロキシは、2箇所でこのデータを格納することができます。

・メッセージ内 ・スマートプロキシ内

メッセージ内部の戻りアドレスを格納するために、スマートプロキシはメッセージへの返信アドレスをのための新しいメッセージフィールドを追加することができます。要求 - 応答サービスは、このフィールドを返信メッセージにコピーする必要があります。すべてのスマートプロキシは応答メッセージからの特別なメッセージフィールドを抽出し、メッセージからフィールドを削除し、フィールドで指定されたチャネルにメッセージを転送します。このソリューションは、スマートプロキシをシンプルに保ちますが、要求 - 応答サービスでの連携が必要です。要求 - 応答サービスは、修正不可能なコンポーネントである場合、このオプションは使用できない場合があります。

代わりに、スマートプロキシは、例えばメモリ構造やリレーショナルデータベースなどの、専用のストレージにリターンアドレスを格納することができます。スマートプロキシの目的は要求と応答メッセージとの間でメッセージを追跡することであるため、スマートプロキシは、通常、要求メッセージからとにかく応答メッセージに相関させ、両メッセージを分析するために、データを格納する必要がある。このアプローチは、応答メッセージに要求メッセージを相関させることができるようにスマートプロキシに要求する。(訳注:誤植?→correlate the reply message to the response message)多くの要求 - 応答サービスは、要求メッセージから応答メッセージに相関IDのコピーをサポートしています。もしもスマートプロキシが、元のメッセージ形式を変更できない場合は、要求と応答メッセージを相関させるために、このフィールドを使用(乱用)することができます。相関識別子は、複数のリクエスタ間でではなく、単一のリクエスタによる要求間で一意である必要があり、またすべてのリクエスターは、相関識別子を指定しているわけではないので、スマートプロキシは、独自の相関IDを作成する必要があります。サービス応答キューは、現在複数のリクエスタからのメッセージを伝送するので、オリジナルの相関IDを使用することは信頼できません。そのため、スマートプロキシは、元リターンアドレスと共に、オリジナルの相関IDを格納し、それが本来の相関IDを取得し、応答メッセージが到着したときにアドレスを返すことができるように、オリジナルの相関IDを独自の相関IDで置き換えます。多くは、応答メッセージの相関識別子としてリクエストメッセージのメッセージIDを利用する。これは別の問題を呼ぶ。このサービスは今は、スマートプロキシから受け取ったリクエストメッセージのメッセージIDを、スマートプロキシへの応答メッセージにコピーします。スマートプロキシは、本来のの要求メッセージのメッセージIDで、応答メッセージの相関IDを置き換える必要があります。それで、要求が適切に要求を関連付け、メッセージを返信することができます。次の図は、このプロセスを示しています。

図:相関IDとリターンアドレスを保存し、交換する □例:MSMQおよびC#での簡単なスマートプロキシ

スマートプロキシを実装すると、それほど複雑ではありません。次のコードは、2つのリクエスタである、スマートプロキシと単純なサービスからなるシナリオを実装しています。スマートプロキシは、コンソールに表示するためのコントロールバスへのメッセージ処理時間を渡します。

リクエスタは、メッセージIDもしくはメッセージオブジェクトによって提供される数値のAppSpecific?プロパティの、いずれかの方法で相関させることを許したい。

図:シンプルなスマートプロキシの例

コーディングの便宜のために、イベント駆動型メッセージコンシューマを作成するために必要なコードをカプセル化する基本クラスのMessageConsumer?を定義します。クラスを継承することで、必要なメッセージ処理を実行するための仮想メソッドのProcessMessage?を簡単にオーバーロードし、メッセージキューの構成やイベント駆動型の処理を心配する必要がありません。このコードを共通の基本クラスに分離することで、簡単にテストクライアントと、わずか数行のコードで、ダミーの要求 - 応答サービスを作成することができます。

MessageConsumer

public class MessageConsumer
{
   protected MessageQueue inputQueue;
   public MessageConsumer (MessageQueue inputQueue)
   {
       this.inputQueue = inputQueue; 
       SetupQueue(this.inputQueue);
       Console.WriteLine(this.GetType().Name + ": Processing messages from " + inputQueue.Path);
   } 

   protected void SetupQueue(MessageQueue queue)
   {
       queue.Formatter = new System.Messaging.XmlMessageFormatter(new String[] {"System.String,mscorlib"});
       queue.MessageReadPropertyFilter.ClearAll();
       queue.MessageReadPropertyFilter.AppSpecific = true;
       queue.MessageReadPropertyFilter.Body = true;
       queue.MessageReadPropertyFilter.CorrelationId = true;
       queue.MessageReadPropertyFilter.Id = true;
       queue.MessageReadPropertyFilter.ResponseQueue = true; 
}
 
   public virtual void Process()
   {
       inputQueue.ReceiveCompleted += new ReceiveCompletedEventHandler(OnReceiveCompleted); 

       inputQueue.BeginReceive();
   }

   private void OnReceiveCompleted(Object source, ReceiveCompletedEventArgs asyncResult) 
   {
       MessageQueue mq = (MessageQueue)source;
       Message m = mq.EndReceive(asyncResult.AsyncResult); 
       m.Formatter = new System.Messaging.XmlMessageFormatter(new String[] {"System.String,mscorlib"});

       ProcessMessage(m);
       mq.BeginReceive();
   } 

   protected virtual void ProcessMessage(Message m)
   {
       String text = "";
       try
       {
          text = (String)m.Body;
       }
       catch (InvalidOperationException) {};
       Console.WriteLine(this.GetType().Name + ": Received Message " + text);
   }
} 

出発点としてMessageConsumer?クラスで、スマートプロキシを作成することができます。スマートプロキシは2つのMessageConsumer?、リクエスタからの要求メッセージと、要求 - 応答サービスから返される応答メッセージが含まれています。スマートプロキシは、リクエストと応答メッセージとの間のメッセージデータを格納するハッシュテーブルを持っている。

SmartProxy 
 
public class SmartProxyBase
{
   protected SmartProxyRequestConsumer requestConsumer;
   protected SmartProxyReplyConsumer replyConsumer;
   protected Hashtable messageData;
   public SmartProxyBase(MessageQueue inputQueue, MessageQueue serviceRequestQueue,MessageQueue serviceReplyQueue) 
   {
       messageData = Hashtable.Synchronized(new Hashtable());
       requestConsumer = new SmartProxyRequestConsumer(inputQueue,serviceRequestQueue,serviceReplyQueue, messageData); 
       replyConsumer = new SmartProxyReplyConsumer(serviceReplyQueue, messageData);
   }

   public virtual void Process()
   {
       requestConsumer.Process();
       replyConsumer.Process();
   }
}

SmartProxyRequestConsumer?は比較的簡単です。それは、実際のサービスに送信される新しい要求メッセージのメッセージIDによってインデックス化されたハッシュテーブルに、要求メッセージ(メッセージIDと、リターンアドレス、のAppSpecific?プロパティ、および現在の時刻)からの関連情報を格納します。要求 - 応答サービスは、このメッセージIDを、サービス応答メッセージの関連IDフィールドにコピーします。それでスマートプロキシは、格納されたメッセージデータを取り戻すことができます。SmartProxyRequestConsumer?は、スマートプロキシが応答メッセージのためにリッスンするキューのリターンアドレス(ResponseQueue?プロパティ)も置き換えます。サブクラスが任意の分析を行うことができるように、このクラスの仮想メソッドであるAnalyzeMessage?を含めました。

SmartProxyRequestConsumer
  
public class SmartProxyRequestConsumer : MessageConsumer
{ 
   protected Hashtable messageData;
   protected MessageQueue serviceRequestQueue;
   protected MessageQueue serviceReplyQueue; 
   public SmartProxyRequestConsumer(MessageQueue requestQueue, MessageQueue serviceRequestQueue, MessageQueue serviceReplyQueue, Hashtable messageData)   :base(requestQueue)
   {
       this.messageData = messageData;
       this.serviceRequestQueue = serviceRequestQueue;
       this.serviceReplyQueue = serviceReplyQueue;
   } 

   protected override void ProcessMessage(Message requestMsg)
   {
       base.ProcessMessage(requestMsg);
       MessageData data = new MessageData(requestMsg.Id, requestMsg.ResponseQueue,requestMsg.AppSpecific);

       requestMsg.ResponseQueue = serviceReplyQueue;
       serviceRequestQueue.Send(requestMsg);
       messageData.Add(requestMsg.Id, data);
       AnalyzeMessage(requestMsg);
} 

   protected virtual void AnalyzeMessage(Message requestMsg)
   {
   }
} 

SmartProxyReplyConsumerはサービス応答チャネルをリッスンします。ProcessMessage?メソッドは、SmartProxyRequestConsumer?によって格納された関連する要求メッセージのメッセージデータを取得し、AnalyzeMessage?テンプレートメソッドを呼び出します。そうして相関IDとAppSpecific?プロパティを新たな応答メッセージにコピーし、それを本来の要求メッセージで指定されたリターンアドレスにルーティングします。

SmartProxyReplyConsumer

public class SmartProxyReplyConsumer : MessageConsumer
{
   protected Hashtable messageData;
   public SmartProxyReplyConsumer(MessageQueue replyQueue, Hashtable messageData) : base(replyQueue) 
   {
       this.messageData = messageData;
   }

   protected override void ProcessMessage(Message replyMsg)
   {
       base.ProcessMessage(replyMsg);
       String corr = replyMsg.CorrelationId;
       if (messageData.Contains(corr))
       {
          MessageData data = (MessageData)(messageData[corr]);
          AnalyzeMessage(data, replyMsg);
          replyMsg.CorrelationId = data.CorrelationID;
          replyMsg.AppSpecific = data.AppSpecific;
          MessageQueue outputQueue = data.ReturnAddress;
          outputQueue.Send(replyMsg);
          messageData.Remove(corr);
       } else {
          Console.WriteLine(this.GetType().Name + "Unrecognized Reply Message");
          //send message to invalid message queue
       }
   }

   protected virtual void AnalyzeMessage(MessageData data, Message replyMessage)
   {
   }
}

メトリックを収集し、それらをコントロールバスに送信するために、SmartProxy?SmartProxyReplyConsumerクラスの両方をサブクラス化する。MetricsSmartProxy?AnalyzeMessage?メソッドの簡単な実装を含み、コンシューマとしてSmartProxyReplyConsumerMetrics?をインスタンス化します。このメソッドは、要求と応答との間の実行中のメッセージを計算し、このデータを未処理のメッセージの数と一緒にコントロールバスキューに送信する。より複雑な計算を実行するには、簡単にこのメソッドを拡張するyことができる。コントロールバスキューは、ファイルに各着信メッセージを書き込む、単純なファイルライターに接続されている。

MetricsSmartProxy

public class MetricsSmartProxy : SmartProxyBase
{  
public MetricsSmartProxy(MessageQueue inputQueue, MessageQueue serviceRequestQueue, MessageQueue serviceReplyQueue, MessageQueue controlBus) :
    base (inputQueue, serviceRequestQueue, serviceReplyQueue)
   {
       replyConsumer = new SmartProxyReplyConsumerMetrics(serviceReplyQueue,messageData, controlBus);
   } 
} 
SmartProxyReplyConsumerMetrics 

public class SmartProxyReplyConsumerMetrics : SmartProxyReplyConsumer
{ 
   MessageQueue controlBus;
   public SmartProxyReplyConsumerMetrics(MessageQueue replyQueue, Hashtable messageData, MessageQueue controlBus) : base(replyQueue, messageData)
   {
       this.controlBus = controlBus;
   } 

   protected override void AnalyzeMessage(MessageData data, Message replyMessage)
   {
       TimeSpan duration = DateTime.Now - data.SentTime; 
       Console.WriteLine(" processing time: {0:f}", duration.TotalSeconds);
       if (controlBus != null) 
          {controlBus.Send(duration.TotalSeconds.ToString() + "," + messageData.Count); 
       }
   }
}

ソリューションは以下のようなクラス図になります:

図:スマートプロキシの例 クラス図

プロキシをテストするために、0から200ミリ秒の間のランダムな間隔を待つ、何もしないダミーの要求 - 応答サービスを作成した。それぞれ100ミリ秒間隔で30のメッセージをパブリッシュする2つ​​のリクエスタをスマートプロキシに与えます。

デモの目的のために、結果のコントロールバスファイルをMicrosoft Excelのスプレッドシートにロードし、見栄えの良いグラフを作成しました:

図:スマートプロキシによって収集された応答時間の統計をコントロールバスコンソールにより可視化

13のメッセージがキューに入れられるまで、キューサイズ応答時間は着実に増加していることが見て取れます。その時点で、リクエスタは、もはや新しいメッセージを送りませんし、キューサイズは着実に減少します。応答時間も同様に減少したが約1秒残っている。なぜなら、現在処理中のメッセージがその長いの要求キューに座ってきているので。

関連パターン:Correlation Identifier, Message Router, Request-Reply, Return Address, Wire Tap


担当者のつぶやき

  • もう少しメッセージ機構上全体にルールを設けていくのもいいんじゃないかと思いました。メッセージ上に応答用のチャネルスタックをおけるだとか、全体のルールを整備していく方向も必要な気がしました。
  • Dropbox使います、。

みんなの突っ込み