お手軽メッセージキュー「Kestrel」を使ってみる
いきさつ
ついに『斉藤さん』が北米・UKなどで公開されたらしいです。
海外ではウケるのかどうか未知数ではありますが、さらにアクセス数が増えると思うとgkbrな日々です。
今のところWebサーバーのプロセスが比較的スムーズに処理されているようなので問題ないのですが、「ある程度時間のかかる処理で、その処理の結果を返す必要がないもの*1」の割合が多くなってきた場合、処理の待ち時間が長くなってしまい、Webサーバーでのプロセスが溜まってしまう、というようなことが起きてしまう懸念点があります。
そういう処理ってWebサーバー側ではやることやったらレスポンスを返して、時間のかかる処理自体はバックエンドでやりたいですよね?
まあ、いま流行りの「非同期処理」というやつですね。
非同期処理をする場合はいわゆる「この処理をこういう条件でやってね!」→「はーい!わかりましたー!」というメッセージングのやり取りが必要になるので、Memcachedプロトコルを話せるお手軽なメッセージキューサーバー「Kestrel」を導入することにしました。
インストール
KestrelはScalaで作られているそうなので、Javaが動く環境なら公式サイトからビルド済みのプログラムファイル一式を持ってきて、適当な場所に置いてしまえば、あとは普通にjarファイルを実行するだけで使えます。
そして、常駐させておきたいのでデーモン化するためにシェルスクリプトをごりごり書く・・・のでもいいのですが、最近は常駐プログラムを管理するときに「Supervisor」というプロセス管理ツールを使うのが流行りのようなので、そっちでKestrelを常駐させるように設定します*2。
ついでに、タスクワーカー(タスクを処理するプログラム)もSupervisorで常駐させてしまうといい感じです。
詳しくはここを見ればだいたいわかります。
ハマった点
PHPのMemcached拡張(pecl-memcached)は、セットしたいデータが100バイトを超えると自動的に圧縮をかけるらしい。
普通にMemcache使う場合は自動的に解凍してくれるので問題ないが、Kestrelでは解凍してくれないので圧縮を無効化する必要がある。
その他
配列とかを入れても自動的にシリアライズ・デシリアライズしてくれますが、Msgpackとかが使える環境であれば、それをつかって手動でシリアライズ・デシリアライズしたほうが良さげ(多分そっちのほうが速い)。
サンプル(タスクキュー送信)
<?php $mem = new Memcached; $mem->addServer(KESTREL_HOST, 22133); // セットする際に自動的に圧縮しないようにする $mem->setOption(Memcached::OPT_COMPRESSION, false); $data = array('key'=>'value'); $data = msgpack_pack($data); // 「message」キューのタスクをセット($dataの中には処理に必要なデータを入れる) $mem->set('message', $data);
サンプル(ワーカープロセス)
<?php $mem = new Memcached; $mem->addServer(KESTREL_HOST, 22133); for (;;) { // 「message」キューのタスクがあるか2秒待つ $result = $mem->get('message/t=2000/open'); if ($result !== false) { $result = msgpack_unpack($result); // ここで$resultに応じて処理をする // 下記はAndroidのPush通知を送る処理の例 $registrationId = $result['registration_id']; $collapseKey = $result['collapse_key']; $data = $result['data']; $header = array(); $header[] = "Content-Type: application/x-www-form-urlencoded"; $header[] = "Authorization: GoogleLogin auth=" . GOOGLE_C2DM_TOKEN; $post = array(); $post['registration_id'] = $registrationId; $post['collapse_key'] = urlencode($collapseKey); foreach ($data as $name => $d) { $post['data.' . $name] = urlencode($d); } $postf = http_build_query($post, '&'); $ch = curl_init(GOOGLE_C2DM_URL); $options = array( CURLOPT_RETURNTRANSFER => TRUE, CURLOPT_FAILONERROR => TRUE, CURLOPT_FOLLOWLOCATION => TRUE, CURLOPT_POST => TRUE, CURLOPT_HTTPHEADER => $header, CURLOPT_POSTFIELDS => $postf, CURLOPT_HEADER => TRUE, CURLOPT_TIMEOUT => 15, CURLOPT_CONNECTTIMEOUT => 15, ); curl_setopt_array($ch, $options); curl_exec($ch); curl_close($ch); $result = $mem->get('message/close'); } }