7.イベントの取り扱いと複数I/Oストリーム
6章で作成したようなサーバアプリケーションの多くは、たくさんのイベントをさばくように作られている。しかし、このイベント処理部分はプロジェクトごとに何度も新規に作られる場合が多い。
ACE ではリアクターフレームワークを使うことにより、イベントハンドリングを次の3段階で作成することができる。
- ACE_Event_Handler クラスを継承し、アプリケーション独自のイベントハンドリング部分をコールバック関数として実装する。
- ACE_Reactor クラスにイベントハンドラオブジェクトを登録する。
- ACE_Reactor イベントループを開始する。
7.1 リアクターフレームワークの概観
ネットワーク接続のような複数の I/O ソースを利用する伝統的なアプリケーションの場合、接続ごとにプロセスやスレッドを作成する設計がよく見られる。これらは一般的ではあるが、オーバーヘッドやメンテナンス性が問題となる。
これから紹介するモデルはリアクティブモデルと呼ばれ、select() や WaitForMultipleObjects() などのイベントデマルチプレクサを元にしている。この仕組みでは複数の接続を単一のプロセス・スレッドで管理することができる。
この章で解説するクラスは次の通りである。
- ACE_Reactor
- ACE_Event_Handler
- ACE_Time_Value
- ACE_Sig_Set
- ACE_Acceptor
- ACE_Connector
- ACE_Svc_handler
7.2 複数I/Oソースの取り扱い
サーバやクライアント、P2P などのプログラムでは複数の I/O ソースを同時に取り扱わなければいけない。こんな場合にリアクターフレームワークが役に立つ。
6章で作成した基本的なサーバの場合は、接続を受け付けるハンドラとクライアントとの通信を行うハンドラを準備する。そして通信ハンドラを N 個、接続受付ハンドラを 1個の、合計 N+1 ハンドラがリアクターに登録されることになる。
7.2.1 接続受付ハンドラ
6.3節で ACE_SOCK_Acceptor を使って接続を受け付けるようにしたが、今回はそれをイベントハンドラの中にラップする。ソースコードの例は p.143-149 に記載され、各部分の説明が挟まれている。
ACE_Event_Handler で実装しなければいけないメソッドは handle_input と handle_close、それに get_handle の三つである。ClientAcceptor では、それに加えて受付処理開始のために open メソッドを実装する。
open メソッドは p.144-145 のような内容になり、ACE_SOCK_Acceptor::open() で受付を開始した後に、accept() で接続を待つ代わりに自分自身をリアクターの register_handler() で登録する。
リアクターベースの場合、main() 関数は p.145 のようになる。利便性のため ACE_Event_Handler::reactor() メソッドでリアクターインスタンスへのポインタを設定している所が問題に見えるかもしれないが、シングルトンを利用したこのパターンで記述している限りは ACE 内部でうまく処理してくれるので安心してよい。
接続を受け付ける準備ができたら、あとは ACE_Reactor::run_reactor_event_loop() を呼ぶだけである。このメソッドはハンドラの処理中にエラーが起きるか end_reactor_event_loop() が呼ばれるまでイベントを待って処理し続ける。
実際に接続が届くと、先程 open() で設定しておいた ACCEPT イベントに関連付けられているハンドラ(すなわちClientAcceptor のインスタンス)の handle_input() メソッドがコールされる。このメソッドの実装は p.146-147 の通りである。
この中で呼ばれている ClientService::open() の実装は p.148 のようになっている。(他の部分は p.149 以降にある)
ACE_Event_Handler のコールバック関数が返す返り値は p.149 表7.1 のように決まっている。実装する時には注意しておくこと。
7.2.2 入力の処理
クライアントとの通信を行う ClientService クラスについては p.149-151 に記載されている。ACE_SOCKET_Stream オブジェクトを返す peer() メソッドに注意。
handle_input() メソッドが複雑になっているが、これは6章のシンプルサーバが同時に1つのクライアント接続しか扱わなかったのに対し、現在は複数の接続を同時に扱うための変更を含んでいるためである。
send() による送信後にあるコード群は、バッファの内容全てを送信しきれなかった場合に、一旦蓄積しておいて送信可能になった時点で再度試みるためのものだ。利用している ACE_Message_Queue については12章で解説する。より詳しくは C++Network Programming Vol2 を参照してほしい。
7.2.3 出力のハンドリング
出力ハンドリング部分はポータブルに実装するために込み入ったコードになってしまっている。これはリアクターの実装の種類によるもので、select() の場合と WaitForMultipleObjects() の場合で挙動が異なっているため、どちらでも正しく動作するようにコーディングするのは難しい。コード例は p.153-154 にある。
7.3 シグナル
POSIX でシグナルを扱うには signal() システムコールに応答したいシグナル番号とハンドラ関数へのポインタを渡して登録する方法が一般的だ。しかしながら、UNIX 間において仕様が違う場合があり、ポータブルなコードを書くことは難しい。
ACE では ACE_Event_Handler を継承して handle_signal() メソッドをオーバーライドすることでシグナルへ対応できる。サンプルコードは p.156-158 である。
7.3.1 シグナルの捕捉
先程のサーバではサーバ自体を止める方法が無い(強制終了のみである)。そこで、SIGINT シグナルを受け取ったら終了するようにコードを追加してみよう。
p.156 のクラスではコンストラクタでリアクターの register_handler() を用いて自身をシグナルハンドラとして登録している。これでシグナルへの対応ができたことになる。
7.3.2 単一のハンドラで複数シグナルを捕捉する
続いて、複数のシグナルにハンドラを関連付けしたい場合である。上の方法で register_handler() を複数回コールすれば、とりあえずの目的は達成できる。しかし、ACE では一度のコールで用件を済ませる方法もある。p.157-158 の例がそれである。
ACE_Sig_Set 型のオブジェクトを用意し、設定したいシグナルを sig_add() で与え、それを register_handler() に渡せばよいのだ。ACE_Sig_Set にはこの他にも次のような関数が定義されている。
- sig_del:セットの中から該当するシグナルを外す。
- is_member:該当のシグナルがセットに含まれているかどうかを返す。
- empty_set:セットの中のシグナル全てを外す。
- fill_set:有効なシグナル全てをセットに含める。
7.4 お知らせ(Notification)
一つ前の節でシグナルによってログ出力を切り替えるハンドラについて説明しているが、その handle_signal() メソッドには実際のログ出力への操作は何も書いていない。これはシグナル処理は特別な状況なので、なるべく短かく簡潔に処理を終わらせなければならないからである。このため、実際のログ切り替えには Notification 機能を利用している。
Notification 機能は ACE_Reactor::notify() メソッドを呼ぶことでイベントをキューに登録し、イベントループの最中に処理してもらうシステムである。このイベントの処理には handle_exception() ハンドラメソッドが利用される。
今回の例(p.159)ではシグナル時にフラグを設定しておき、handle_exception() 内でそのフラグを参照してログ出力を切り替えている。
7.5 タイマー
アプリケーションで定期的に処理を行いたい場合がある。このような時は伝統的な手法だとスレッドやプロセスを追加して、その中で sleep() を使って時間を調節して処理を行う。このような場合のコード例は p.160-161 のようになる。
しかし、このアプローチには以下のような問題がある。
- ポータブルではない。低レベルな fork() や kill() を利用しているため、プラットフォーム間での互換性が無い。
- 多くのタイマを使う際にリソースが必要になる。各タイマごとにスレッドやプロセスを利用するため、リソースを消費する。
- タイマに対する操作が行えない。一度生成したら kill() で終了させるくらいしかできない。中断や再開、時間の変更などを行いたい場合がある。
7.5.1 リアクタによるタイマのスケジューリング
リアクターを用いたタイマの場合、ACE_Event_Handler を利用して handle_timeout にタスクを実装(例 p.162)する。その後、ACE_Reactor::schedule_timer() メソッドでリアクターに登録を行う。
このとき handle_timeout() に渡る current_time 変数の内容はシステム時刻ではない。(訳注:「ハンドラがディスパッチされた時刻」と書いてあるが、どの時点なのか不明)
7.5.2 状態データ
handle_timeout() メソッドには状態オブジェクトを渡すこともできる。まずは、通常と同様にハンドラを作り、第二パラメータからオブジェクトを取得するようにコードを書く。そして schedule_timer() の第二パラメータにオブジェクトのポインタを渡してやる。このコード例は p.163-166 に記載されている。
7.5.3 タイマIDの利用
schedule_timer() メソッドの返り値はタイマIDとして知られている。これを利用することでタイマの間隔を再設定したり、タイマをキャンセルしたりできる。サンプルは p.166-168 にある。
はじめの SigIntHandler がタイマ間隔の変更で、次の SignalHandler がタイマの中断を行うようにした例である。
7.6 アクセプターコネクターフレームワークの使い方
これまでの例では ACE_SOCK_Acceptor を使って接続を待ち、準備ができたらサービス用のハンドラに渡して処理を行っていた。ここではそれらをフレームワークを利用して実装してみる。
アクセプターコネクターフレームワークの概念図は p.169 の表7.1 のようになっている。一目見てもよくわからないかもしれないが、解説を読んでいくうちに理解できるだろう。
7.6.1 ACE_Acceptor と ACE_Svc_Handler の利用
まずは p.170 の上の例を見てほしい。これだけで ACE_SOCK_Acceptor で接続を待ち、要求が来たら ClientService にまわす処理が記述できたことになる。あとは p.170 中段のように open() を呼ぶだけである。
ClientService クラスは ACE_Svc_Handler テンプレートクラスから継承して実装する。p.170-171 に宣言が載っている通り、ストリームの型と同期のタイプを指定してやる必要がある。このうち、同期のタイプは12章で ACE_Task を扱う時に説明するが、今は ACE_NULL_SYNCH を指定しておけばよい。
また、get_handle() や peer() メソッドも無くなっているが、これは ACE_Svc_Handler が用意してくれるためである。なお、super を typedef しているのは、スーパークラスのメソッドを呼びやすくするためだ。
ClientService の open() は p.172 のようになる。アドレスをデバッグ出力しなくてよいなら、(最初に呼んでいる)スーパークラスの open() で済んでしまうため、この実装自体不要となる。
ちなみに open() に渡されるパラメータは、このハンドラを作成した ACE_Acceptor へのポインタである。デフォルトの open() の実装はハンドラを READ イベントを対象としてリアクタに登録することである。もし open() が -1 を返した場合、フレームワークはハンドラの close() を呼び出す。こちらのデフォルト実装はハンドラを delete するようになっている。
handle_input() および handle_output() の改訂版は p.173-175 に載っている。違いは次の点である。
- ACE_SOCK_Stream へのアクセスに ACE_Svc_Handler::peer() を利用している。
- ACE_Message_Queue へのアクセスに ACE_Task::msg_queue() を利用している。
- アイテムのキューへの投入に ACE_Task::putq() を利用している。
ここまで見て来たように、ACE_Svc_Handler を利用することで劇的にコードが単純化され、本来の目的に集中できるようになる。
7.6.2 テンプレートに関する詳細>
この実装ではテンプレートを利用しているため、そのインスタンス化を行っておくべきである。それには p.175 のコードが必要となる。細かいことではあるが、各種のコンパイラに対応してポータブルなアプリケーションを作るためには重要な作業だ。
7.6.3 ACE_Connector の利用と、その他の機能
ACE_Connector はアクセプターコネクターフレームワークのもう一つの主役である。これは ACE_Acceptor と同様に ACE_Svc_Handler クラスのオブジェクトをハンドラとして接続処理を行う。さきほどのサーバと接続するためのクライアントのサンプルを p.176-181 で解説する。
まずこの例で気になるのは ACE_Reactor_Notification_Strategy クラスだろう。これはストラテジーパターンに則ったクラスで、該当のクラスを変更せずにその振る舞いを変更できるようにするものだ。実際の利用例が p.177-178 の open() に含まれている。
スーパークラスの open() の呼び出しに成功したら、ACE_Message_Queue に対して notification_strategy() メソッドを呼び出している。少し戻って notifier_ のコンストラクタを見ると、ハンドラ自身のポインタと WRITE_MASK を指定して作成している。ここで与えている 0 はリアクタへのポインタである。(コンストラクト時点ではアドレスが不明なので 0 にしてある。)notification_strategy() の呼び出しの前に notifier_ にリアクタのポインタを設定し、それからこのメソッドの呼び出しを行うことで ACE_Message_Queue のストラテジを ACE_Reactor_Notification_Strategy へと設定し、キューに ACE_Message_Block が投入されるたびに設定されたストラテジの notify() メソッドを呼び出すようにしている。このテクニックは ACE_Message_Queue への投入イベントに対して、イベントループ内で処理を行わせたい場合に有用である。
p.178-179 の handle_input() は受信したデータをログ出力してエラー処理をしているだけである。注意点は受信文字列が 0終端でないため、受信バイト数を利用して表示していることだ。
handle_timeout() では設定した間隔(2秒おき)で繰り返しを行い、繰り返し回数を含めた文字列を構築して putq() でキューへ投入する。すると、notifier_ の設定により handle_output() がコールされることになる。
p.180 に handle_output() のソースがある。ここではキューからメッセージを取り出し、送信して(もしあれば)残りをキューへ戻す。また、キューが空になった場合はリアクタに対して cancel_wakeup() でこのハンドラの呼び出しを抑制する。
最後に main() 関数は p.181 のようになる。この例で pc を 0 とした場合には、実行に伴い Client のオブジェクトが動的に割り当てられる。この時は handle_close() によって解放処理が行われるように設定されるため、プログラマが明示的に解放する必要はない。また、例のように普通にポインタを与えた場合には自動解放は行われず、通常通り解放はプログラマの責任となる。この詳しい仕組みについては C++ Network Programming Vol.2 を参照してほしい。
7.7 リアクターの実装
大部分のアプリケーションでは、デフォルトの ACE_Reactor::instance() が返すリアクタで事足りると思う。しかし、状況によってはリアクタの実装を別のものにしたい時もある。このような場合には p.182 のように処理すればよい。
ACE_Reactor のコンストラクタと ACE_Reactor::instance() に渡されている第二引数は、該当のオブジェクトが削除された時に第一引数のオブジェクトを自動解放するかどうかを示すフラグである。
7.7.1 ACE_Select_Reactor
ACE_Select_Reactor は windows 以外で使われるデフォルトのリアクタ実装である。これは1スレッドのみから利用するようにデザインされており、このリアクタを生成したスレッドが owner() の戻り値として登録される。イベントループの開始はこのおオーナーのみが行え、他のスレッドから呼ばれると -1 を返して即終了する。
7.7.2 ACE_WFMO_Reactor と ACE_Msg_WFMO_Reactor
ACE_WFMO_Reactor は windows におけるデフォルトのリアクタ実装である。select() の代わりに WaitForMultipleObjects() を利用して動作するようになっている。このリアクタ実装については次のようなトレードオフが存在する。
- ハンドル数の制限。このリアクタは62個までしかハンドルを利用できない。本来は64までだが、ACE 内部で2個使うため62となっている。
- I/O型による制限。このリアクタは handle_input()、handle_output()、handle_exception() をソケットハンドルについてのみ利用できる。ACE_SPIPE_Stream などの他のIPC型には利用できない。しかしながらオーバーラップI/Oは利用できるので、8章で説明するプロアクターフレームワークを利用すること簡単にオーバーラップI/Oを使うことができる。
- Waitable ハンドル。このリアクタは WaitForMultipleObjects() に利用できるハンドルであれば、何であっても(例:ファイル変更通知ハンドル、イベントハンドル)反応することができる。そのためにはリアクタの register_handler() を利用する。登録したイベントが発火するとハンドラの handle_signal() がコールされる。
- マルチスレッド対応。このリアクタのイベントループはマルチスレッドでも動かすことができる。この場合はそれぞれのコールバックが違うスレッドで実行されることを踏まえて、同期制御する必要がある。
- ハンドラ解除の遅延。マルチスレッド対応のイベントループのため、ハンドラ解除の指示を出してもすぐには実行されず遅延される場合がある。このため、プログラマはハンドラの解除がコールバックが -1 を返したために起こったのか、remove_handler() を呼び出したためなのかを覚えておかなければならない。詳しくは p.183-184 の原文を参照のこと。
もし作成するアプリケーションが COM/DCOM サーバである場合、ACE_WFMO_Reactor の代わりに ACE_Msg_WFMO_Reactor を利用するべきである。こちらは windows メッセージもディスパッチする。
7.7.3 ACE_TP_Reactor
ACE_TP_Reactor は ACE_Select_Reactor の、マルチスレッド対応にした拡張版である。プログラマが用意した一つ以上のスレッドからイベントループを呼び出して利用する。これらはリーダー・フォロワーパターンに則っており、リーダーがリアクタの所有権を持ってイベント待ちしている間、フォロワー達は待機している。何かのイベントが発生したらリーダーが対応を始め、フォロワーのうち一人が新しいリーダーになる。このプロセスがリアクタのシャットダウンまで継続する。
7.7.4 ACE_Priority_Reactor
ACE_Priority_Reactor も ACE_Select_Reactor の拡張版である。こちらは ACE_Event_Handler クラスに priority() メソッドが追加され、各ハンドラはその返り値に応じた位置で待機する。イベントが届くと、priority の高いハンドラが優先して処理されるようになっている。
7.7.5 GUIと統合されたリアクタ
リアクタベースの GUIアプリケーションを作成するために、いくつかのツールキット向けリアクタが作成されている。これらを利用することで GUI イベントとアプリケーションイベントを一つのスレッドで処理することができる。リストは p.185 参照のこと。
Index
Top