EIP / Pipes and Filters


EIP

Chapter 3 : パイプとフィルター (Pipes and Filters)

多くの企業統合システムにおいて、ある単一のイベントは一連の特定の機能を実行する処理ステップの引き金を引く。 たとえば、ある新規注文がメッセージフォームとしてわれわれの会社に届いたと仮定しましょう。 あるひとつの要求は、盗聴者が顧客の注文を盗聴するのを防ぐために、そのメッセージは暗号化されていることかもしれません。 2番目の要求は、メッセージは、注文は信用できる顧客により発行されたことを保障するためのデジタル証明書のフォームの中に認証情報を含んでいる、ということになります。 加えて、外部の関係者から二重にメッセージが送られることもあります。(すべての有名なショッピングサイトで「いますぐ注文」ボタンを一度だけクリックするという警告があることを思い出してみてください。) 二重出荷と不幸せな顧客を防ぐために、処理ステップが続け開始される前に、二重メッセージを削除する必要があります。 これらの要求に合致するために、もしかしたら二重になっていて、追加の認証データを含む暗号化されたメッセージを、外部からのデータフィールドの無い一連のユニークで単純なプレーンテキスト注文メッセージに変換する必要がある。


独立性と柔軟性を保ちながら、どのように複雑なメッセージ処理を実行すればよいのか。


ひとつの可能性のある解決法は、すべての必要な機能を実行する包括的な「入力メッセージのマッサージモジュール」を書くことです。 しかしながら、そういったアプローチは柔軟性が無くテストするのが難しい。とある処理を加えたり取り除いたりする必要があったらどうなるだろうか。 たとえば、プライベートネットワーク上で暗号化が要求されていない、多くの顧客から発注されたらどうなるだろうか。

すべての機能を単一のコンポーネント内に実装することは、また、再利用の機会を減らすことになる。 小さくてうまく定義されたコンポーネントを作ることで、他の処理でそれらを再利用することが出来る。 たとえば、注文ステータス要求メッセージは暗号化されるかも知れないが、二重処理を取り除く必要は無い。なぜなら、二重のステータス参照要求は一般的に害にはならない。 復号処理機能を分離されたモジュールに分けることで、他のメッセージにこの機能を再利用することができる。

統合ソリューションは、たいがい異質のシステムの集まりに接続する。 その結果、異なる処理ステップは異なる物理マシン上で実行する必要があるかも知れない。個別処理ステップは特定のシステム上でのみ実行できるときのように。 メッセージを復号化するために必要なプライベートキーは指定されたマシン上だけで入手可能で、また、それはセキュリティ上の理由から他のどんなマシンからもアクセスできない、という事が可能になる。 これは、復号化コンポーネントは指定されたマシン上でのみ実行されなければならないのに、他のステップは他のマシン上で実行されるかも知れない、ということを意味する。 同様に、異なる処理ステップは、同じコンピュータ上でさえ別のプロセス内で実行することを妨げるような、異なる言語や技術を用いて実装されるかも知れない。

各機能を別々のコンポーネントに実装することだけでは、まだ、コンポーネント間の依存性を導入してしまう。 たとえば、もし、復号するコンポーネントが認証コンポーネントを復号した結果と共に呼び出すならば、認証機能無しで復号処理機能を使うことが出来ない。 もし、各コンポーネントがシステム内の他のコンポーネントから独立な方法で、既存のコンポーネントを一連の処理ステップに「構成する」ことが出来るなら、それらの依存性を解決できる。 これは、コンポーネントは汎用的な外部インターフェースをさらすことにより、それらは交換可能になる、ということを暗示している。

もし、非同期メッセージングを使用しているなら、あるコンポーネントから他のコンポーネントへメッセージを送るという非同期な側面を活用するべきである。 たとえば、とあるコンポーネントはその先の処理のために結果を待つことなく他のコンポーネントへメッセージを送ることが出来る。 このテクニックを使用して、各コンポーネント内の複数メッセージを平行に処理できる。


とある大きな処理タスクをチャネル(パイプ)で接続された小さくて独立した一連の処理ステップに分割するというパイプとフィルターアーキテクチャスタイルを使おう。


各フィルターはとても単純なインターフェースを公開している。つまり、入力パイプからメッセージを受け取り、メッセージを処理し、出力パイプへその結果を発行する。 そのパイプは次のひとつのフィルターに接続し、ひとつのフィルターから次へ出力メッセージを送る。 すべてのコンポーネントは同じ外部インターフェースを使っているので、そのコンポーネントを異なるパイプへつなぐことにより、それらは異なるソリューションに「構成する」ことができる。 フィルター自身には何も変更無しで、新しいフィルターを追加でき、既存のフィルターを削除でき、それらを新しい列に再構成できる。 フィルターの接続部分は時々「ポート」と呼ばれる。 基本形状として、各フィルターコンポーネントはひとつの入力ポートとひとつの出力ポートを持っている。

我々の例題に適用すると、パイプとフィルターアーキテクチャーは2つのパイプで接続された3つのフィルターとなる。(図を参照) 復号するコンポーネントへメッセージを送るためのひとつの追加のパイプと、受注処理システムへ二重メッセージを除くフィルター(de-duper)からクリアテキストの注文メッセージを送るためのパイプが必要である。 これは、全部で4つのパイプになる。

「パイプとフィルター」は、メッセージングシステムのための基本的なアーキテクチャスタイルを説明している。つまり、個別の処理ステップ(フィルター)は、メッセージチャネル(パイプ)を通じて、お互いに連結されるということである。 この章と後ろの章の多くのパターン(ルーティングや変換パターンなど)は、この「パイプとフィルター」アーキテクチャースタイルを基礎としている。 これは、個々のパターンを大きなソリューションに一体化することを簡単にしている。

「パイプとフィルター」スタイルは、お互いから分離するために、抽象パイプを用いている。 そのパイプは、ひとつのコンポーネントがそのパイプにメッセージを送ることが出来るようにしている。なので、そのコンポーネントが知らない他のプロセスにより後でそれが消費されることができる。 そのようなパイプの明確な実装は、「メッセージチャネル(60)」である。典型的には、「メッセージチャネル(60)」は、フィルター間で、言語、プラットフォーム、場所の独立性を提供している。 これは、依存性、運用面、処理性能という理由から、処理ステップを異なるマシンに動かすという柔軟性をもつ余裕がある。 しかしながら、メッセージ基盤により提供される「メッセージチャネル(60)」は、実際には同じマシン上に同居するならば、とても重量級になりえる。 そのパイプを単純なインメモリキューを使うことで、もっとより効率的になる。 そのため、それらが抽象パイプインターフェースで通信するコンポーネントを設計するのは有用だ。 そのインターフェースの実装は、「メッセージチャネル(60)」かインメモリキューのような代替実装を使用することに、いずれ交換される。 「メッセージングゲートウェー(468)」は、どうやってこの柔軟性のためにコンポーネントを設計するかを説明している。

パイプとフィルターアーキテクチャの潜在的な不都合な点のひとつは、要求されるチャネルの数の多さである。 最初に、チャネルはメモリとCPUサイクルを消費するバッファリングと他の機能を提供するために、チャネルは無制限のリソースではないかも知れない。 また、そのデータはアプリケーション内部形式からメッセージング基盤固有の形式に変換しないといけないので、チャネルへメッセージを発行することは一定量のオーバーヘッドをもたらす。 受信の終端では、このプロセスは逆向きになる。 もし、フィルターの長い連結を使用しているなら、繰り返されるメッセージデータ変換のために、柔軟性を得るかわりに潜在的に低性能という代償をはらうことになる。

「パイプとフィルター」の純粋形式としては、ひとつの入力ポートとひとつの出力ボートだけをもてるようにしている。 「メッセージング(53)」を取り扱うときは、この性質をいくぶんか緩和させることができる。 あるコンポーネントは2つ以上のチャネルからメッセージを消費するかも知れず、また、2つ以上のチャネルへメッセージを出力するかも知れない。(たとえば、「メッセージルーター[78]」) 同様に、複数のフィルターコンポーネントは、ひとつの「メッセージチャネル(60)」からメッセージを消費するかも知れない。 「ポイントトゥーポイントチャネル(103)」は、ただひとつのフィルターコンポーネントがそれぞれのメッセージを消費するということを保障する。

「パイプとフィルター」を使うことはまた、しばしば見逃される利点、テスト容易性をももたらす。 各個別の処理ステップは「テストメッセージ(66)」をそのコンポーネントに渡して、期待される出力と結果メッセージを比べることにより、テストすることが出来る。 孤立して各コア可能をテストしデバッグすることは、より効率が良い。なぜなら、特定の機能のためのテスト機構をあつらえることが出来るからである。 たとえば、暗号機能と復号機能のテストをするために、ランダムなデータを含む多数のメッセージを渡すことが出来る。 各メッセージを暗号化し複合化した後で、それを元のと比べればよい。 一方、認証のテストをするために、システム内で知られたユーザと一致する特定の認証コードと一緒にメッセージを供給する必要がある。

パイプライン処理 (Pipeline Processing)

コンポーネントを非同期の「メッセージチャネル(60)」で接続することによって、連結内の各ユニットが独自のスレッドか独自のプロセス内で処理できるようになる。 あるユニットがひとつのメッセージを処理し終わると、そのメッセージを出力チャネルに送り、すぐに他のメッセージの処理を開始することが出来る。 そのユニットは後続のコンポーネントがそのメッセージを読み込んで処理するのを待つ必要は無い。 このことにより、メッセージが個々のステージを通過するので、メッセージを並立に処理することが出来るようになる。 たとえば、最初のメッセージが復号されたあと、それは認証コンポーネントに渡される。 同時に、次のメッセージは既に復号されることが出来る。(図を参照) 我々は、こういう設定をパイプライン処理という。なぜなら、液体がパイプを流れていくように、メッセージがフィルターを流れていくからである。 厳密な連続処理と比較して、パイプラインで処理することで、システムの処理量をきわめて上昇させることが出来る。

並列処理 (Parallel Processing)

しかしながら、全体のシステム処理量は、連結内の最も遅い処理に制限される。 処理量を向上するために、それを処理する複数の並列インスタンスを配備できる。 このシナリオでは、チャネル上の各メッセージがNこの処理可能なプロセッサーの中のただひとつにより消費させることを保障するために、「競争消費者(Competing Consumers)(502)」と共に「ポイントトゥーポイントチャネル(103)」が必要になる。 これにより、最も時間のかかるプロセスをスピードアップさせ、全体の処理量を改善することが出来るようになる。 しかしながら、こういう構成はメッセージが順序がばらばらで処理されることを招く、ということに気づく必要がある。 もし、メッセージの順序が重要であるなら、各コンポーネントのインスタンスをひとつだけ走らせるか、「並べなおし屋(Resquencer)(283)」を用いなければならない。


(図) 並列処理による処理量の向上


たとえば、メッセージを復号するのが、認証処理よりも、とても遅いと仮定するならば、この図で示した、復号するコンポーネントのインスタンスを3つ並列に走らせるという構成を使用することが出来る。 もし、各フィルターがステートレス(つまり、あるメッセージが処理された後、それは前の状態に戻る)ならば、パラレル化したフィルターは最善である。 これは、二重メッセージを除く(de-dupe)コンポーネントを複数個並列に走らせることは簡単には出来ない、ということを意味する。なぜなら、コンポーネントは既に受信したすべてのメッセージの履歴を保持している、つまり、ステートレスではないためである。

パイプとフィルターの歴史 (History of Pipes and Filters)

パイプとフィルターアーキテクチャーは決して新しい概念ではない。 柔軟性と高処理能力を併せ持つこのアーキテクチャのシンプルな優雅さにより、パイプとフィルターアーキテクチャの人気を簡単に理解することができる。 シンプルなセマンティックスにより、また、アーキテクチャを説明するために、形式的な方法をに使用することができるようになる。

[Kahn] は 1974年に、独立したFIFO(First-In, First-Out)チャネルで接続された並列処理の集まりとして、Kahn Process Networks を解説した。 [Garlan] は、パイプとフィルターを含むさまざまなアーキテクチャーの表現方法のある良い章を含んでいる。 [Monroe] は、アーキテクチャーの表現方法とデザインパターン間の関係について、詳細な取り扱い方法を与えている。 [PLoPD1] は、[POSA] の中に含まれているパイプとフィルターパターンの基礎を確立した、Regine Meunierの「パイプとフィルターアーキテクチャー」を含んでいる。 パイプとフィルターのほとんどすべての統合に関係する実装は、[POSA]の中にある「シナリオ検廚紡海い討い襦それは、キューパイプから取り込み、処理し、そこへ転送する、能動的フィルター(active filter)を使っている。 [POSA]で説明されているパターンは、フィルターからフィルターへ渡しているため、それぞれのエレメントは同じ処理ステップを経験している、と仮定している。これは、一般的に、とある統合シナリオの事例ではない。 多くの例において、メッセージは、メッセージの中身や外部制御に基づき、動的にルーティングされる。 事実、ルーティングは企業統合において実によく発生するし、それ自身のパターンMessage Route(78)としてみとめられている。

語彙 (Vocabulary)

パイプとフィルターについて述べる場合、フィルターという言葉について注意する必要がある。 後で、Message Filter(237)とContent Filter(342)という二つのパターンを加えて定義する。 その両方とも、汎用的なフィルターの特別な場合であると同時に、パターン言語の多くの他のパターンでもある。 言い換えると、パイプとフィルターという意味でのとあるフィルターとするために、パターンは(例えば属性値やメッセージを取り除くといった)ろ過機能を持つ必要は無い。 パイプとフィルターというアーキテクチャーの表現方法の名前を変えることにより、こういった混乱を避けることが出来た。 しかしながら、パイプとフィルターはとても重要で広く議論された概念であり、新しい名前を付けるとむしろより混乱すると思われた。 フィルターという言葉を、それらのパターンは常に注意深く使うことを心がけており、また、汎用的なフィルターが、パイプとフィルターか、ろ過するMessage Filter(237)/Content Filter(342)かの、どちらについて述べているかを明確にすることを心がけている。 もし、それらがまだ混乱していると考えたならば、どんな面倒も起こさないように、汎用的なフィルターを(十分に汎用的だが時々乱用される言葉である)コンポーネントと呼んだ。

パイプとフィルターは連続プロセス通信(Communicating Sequential Processes:CSPs)の概念と同じ事を共有する。 Hoareにより1978年にを導入されたCSPs[CSP]は、並列処理システムで発生する同期問題を説明するために単純なモデルを提供した。 CSPsに備わる基本的なメカニズムは入出力を経由した2つのプロセスの同期である。 入出力は、プロセスAがプロセスBへ出力する準備が出来たことを示し、そして、プロセスBがプロセスAからの入力する準備が出来たことを提示した時に発生する。 もし、それらのひとつが、他方が真でないのに発生するならば、他方が準備できるまで、そのプロセスは待ち行列におかれる。 CSPsは、それらが祖結合ではなく、また、パイプがキューメカニズムも提供しないという点で、統合ソリューションとは違う。 それにも関わらず、学界でのCSPsの広範囲に扱われているという利点を享受できる。

C#とMSMQによる単純なフィルター

以下のコード断片は、ひとつの入力ポートとひとつの出力ポートを持つフィルターの汎用的な基底クラスを示している。 基本実装は受信メッセージの本体を単純にプリントし、出力ポートにそれを送る。 より興味深いフィルターは、Processorクラスをサブクラス化し、そのメッセージの追加動作を実行するためのProcessMessage?をオーバーライドする。それは、メッセージの中身を変換するか異なる出力チャネルへルーティングする。

Processorがとある入力チャネルと出力チャネルへの参照を受け取っている、ということに気づく。 したがって、このクラスは特定のチャネルともどんな他のフィルターとも関係していない。 このことは、複数のフィルターをインスタンス化できて、任意の設定でそれらをお互いに結合できるようにしている。

using System;
using System.Messaging;

namespace PipeAndFilters
{
  public class Processor
  {
    protected MessageQueue inputQueue;
    protected MessageQueue outputQueue;

    public Processor (MessageQueue inputQueue, MessageQueue outputQueue)
    {
      this.inputQueue = inputQueue;
      this.outputQueue = outputQueue;
    }

    public void Process()
    {
      inputQueue.ReceiveCompleted += new ReceiveCompletedEventHandler(OnReceiveCompleted);
      inputQueue.BeginReceive();
    }

    public void OnReceiveCompleted(Object source, ReceiveCompletedEventArgs asyncResult)
    {
      MessageQueue mq = (MessageQueue)source;

      Message inputMessage = mq.EndReceive(asyncResult.AsyncResult);
      inputMessage.Formatter = new XmlMessageFormatter(new String[]{"System.String,mscorlib"});
      Message outputMessage = ProcessMessage(inputMessage);

      outputQueue.Send(outputMessage);
      mq.BeginReceive();
    }

    protected virtual Message ProcessMessage(Message m)
    {
      Console.WriteLine("Received Message: "+m.Body);
    }
  }
}

この実装は、Event-Driven Consumer(498)である。 Processメソッドはやってくるメッセージのために登録され、毎回メッセージが届くたびに、OnReceiveCompleted?メソッドを呼び出すために、メッセージシステムに伝えられる。 このメソッドは、やってくるイベントオブジェクトからメッセージデータを抽出し、バーチャルメソッドProcessMessage?を呼ぶ。

この単純なフィルターの例は、トランザクショナルではない。 メッセージを処理している間(出力チャネルへそれが送られる前に)、もしエラーが発生したら、そのメッセージは失われる。 本番環境において、このことは一般的には望ましくない。 この問題を解決するために、Transactional Client(484)を見ること。