ぼちぼち日記

おそらくプロトコルネタを書いていることが多いんじゃないかと思います。

Node.jsにPromiseが再びやって来た!

tl;dr

サンプルコードを付けたら記事がかなり長くなってしまったのでご注意下さい。
Node.jsの current master で V8がアップデートされ ES6の Promise が使えるようになりました(要オプションですが)。Promise を使うと Node.jsの非同期処理がどのようになるのか、Stream と Promise を組み合わせた使い方なども含めて紹介します。

1. はじめに

Nodeの次期安定版 v0.12は、すぐ出ると言われながら既に v0.10のリリースから1年が過ぎてしまいました。
現在、v0.12の主要な新機能の実装は完了していますが、まだ安定版のリリースに向けて手当できていない部分が残っている感じです。そんな残っている部分の一つだった V8 のアップデートが先週末に行われました。

現在の安定版 Node-v0.10.26 では、 V8 は 3.14.5 を利用しています。Node の安定版では、基本 V8 のバージョンアップは行わず、V8 に関してはバックポート等によるバグフィックス対応が中心です。Node-v0.10のリリース後、V8のAPI仕様が大幅に変更されたので、それに対応するためNodeの中身は大きく変わっています。(合わせて multi context対応もやっています)

そんなこともあり、開発版では長らく3.22が使われていましたが、最近になって 3.24に上がりました。これまでにも、安定版リリース直前によくV8のバージョンを上げたり下げたりしますので、今回も実は0.12リリース直前の予兆かもしれません。

この V8 Ver. 3.24で何がいいかというと、個人的には ES6 で現在仕様検討中の Promise が使えるようになったことです。実は昔の Node では Promise が独自実装されていました。4年ほど前に議論の末 deprecate されたのですが、今回ES6で検討中で PromiseA+ 仕様をベースとしたV8の機能となって Promise が再登場です。

ブラウザでは昨年11月ぐらいから利用できていて、今手元の Google Chrome(33.0.1750.154)は、 V8 3.23.17.23 なのでおそらく次の M34 (現beta or Dev?)から Promise が使えるようになるんじゃないかと思います。(ChromeでES6系の機能を利用するには、chrome://flags で「JavaScript の試験運用機能を有効にする」を指定しておくことが必要です。)

Promiseについては下記記事が詳しいのでここでは説明しません。(ちなみに PromiseA+仕様策定の中心人物 Domenic Denicola さんは、Node-v0.12で入る新VMモジュールの作者でもあります。)

また Node での ES6系機能の扱いは下記記事を参照するとよいでしょう。

2. 早速 Node で Promiseを使ってみる

まだ github の master ブランチにしかないので自分でビルドしてみましょう。NodeとV8のバージョンを確認します。

$ node -v
v0.11.13-pre
$ node -e 'console.log(process.versions.v8)'
3.24.35.17

まずは Promise がちゃんと動作するかの確認です。ファイルの stat を取る非同期処理で試してみます。

// 引数で指定したファイルの確認を Promise で処理するサンプル promise_sample1.js
var fs = require('fs');
var file = process.argv[2] || __filename;
var promise = new Promise(function(onFulfilled, onRejected) {
  fs.stat(file, function(err, stat) {
    // return stat object when fulfilled
    err ? onRejected(err) : onFulfilled(stat);
  });
});
// 正常処理なら onFulfilled, エラーなら onRejected が実行される
promise.then(
  function(stat) {
    console.log('Fulfilled:', file, stat);
  },
  function(error) {
    console.log("Rejected:", error.message);
  }
);

で実行してみます。

# ファイルが存在している時
$ node --harmony promise_sample1.js  promise_sample2.js
Fulfilled: promise_sample2.js { dev: 64512,
  mode: 33204,
(中略)
  ctime: Tue Mar 18 2014 15:26:45 GMT+0900 (JST),
  birthtime: Tue Mar 18 2014 15:26:45 GMT+0900 (JST) }

# ファイルが存在していない時
$ node --harmony promise_sample1.js  hoge.txt
Rejected: ENOENT, stat 'hoge.txt'

おぉ、ちゃんと動きました。ではもうちょっと応用を。

3. thenable を使ってみる

thenableというのは、then()メソッドを持つ普通の(Promiseでない)オブジェクトの事です。
PromiseA+仕様では、なんとこの thenableからPromiseオブジェクトを生成できるのです。素晴らしい。

早速 thenable を試しましょう。

サンプルとして静的ファイルを出力するHTTPサーバを作ります。登録されたmimeタイプを持つファイルにアクセスしたらそれを出力、そうでなければ404を返します。HTTPサーバのリクエストオブジェクトを thenable にして、非同期のエラー処理を Promiseで行います。
ここでは、Promise.resolve()というクラスメソッドを用いて thenable から Promiseオブジェクトを生成します。

// Promiseを使って静的ファイルを出力するHTTPサーバのサンプル promise_sample2.js
var path = require('path');
var http = require('http');
var fs = require('fs');
// List of valid file extensions
var mime_type = {'.txt': 'text/plain', '.html': 'text/html', '.png': 'image/png'};

// http.Server のrequestオブジェクトを thenable にする
http.IncomingMessage.prototype.then = function(onFulfilled, onRejected) {
  var mime = mime_type[path.extname(this.url)];
  if (!mime) { // extensin check
    onRejected(new Error('Invalid extension'));
    return;
  }
  fs.readFile('./public' + this.url, function(err, data) {
    err ? onRejected(err): onFulfilled({data: data, mime: mime});
  });
};

var server = http.createServer(function(req, res) {
  var p = Promise.resolve(req); // thenableからPromiseを生成
  p.then(
    function(x) { // 静的ファイルの出力
      res.writeHead(200,
        {'content-type': x.mime, 'content-length': x.data.length});
      res.end(x.data);
    },
    function(err) { // エラーページ処理
      res.writeHead(404);
      res.end('Not Found');
    }
  );
}).listen(8080);

実際試してみるとちゃんと動作しました。 thenableが使えると既存のコードとPromiseの親和性が増します。

4. Promise.all による非同期処理の結合

Promiseを使う醍醐味の一つは、複数の非同期処理の一括取りまとめ処理です。
Nodeのコードを書く際に嫌われている、いわゆる「コールバック地獄」を避けることができます。Promise.allを使い、プロミスオブジェクトのリストを渡すと全部の正常処理が完了した処理が書けるのです。
通常Nodeだと、

fs.readdir(dir1, function(...) {
   fs.readFile(file1, function(...) {
     hash.update(data);
     fs.readdir(dir2, function(...) {
       fs.readFile(file2, function(...) {
          ...
       });
     });   
  });
});

の用にコールバック(地獄)処理をつらつら書き続けないといけないのですが、Promise を使うとこうなります。

function compareDirHash(dir1, dir2) {
  return compareFilesPromise(dir1, dir2).then(function(files) {
    return Promise.all(files.map(function(f) {
      return compareHashPromise(dir1, dir2, f);
    }));
  }).then(
    function(x) {
      // 正常処理
    },
    function(err) { // all case of errors
      // エラー処理
    }
  );
}
compareDirHash('./dir1/', './dir2/');

Promiseを使う方針としては、各処理を行う Promiseオブジェクトを返す関数を作り、最後に Promise.allで取りまとめるやり方です。
Promise.allを使う際には、Array.map を使ってプロミスオブジェクトのリストを生成するととても便利です。
こういったやり方は、Promise Anti-patternsを見ると参考になります。

// 2つのディレクトリ中の全てのファイルハッシュ値を比較するサンプル promise_sample3.js
var fs = require('fs');
var crypto = require('crypto');

// ディレクトリ中のファイルリストを取得する Promise処理
function readDirPromise(dir) {
  return new Promise(function(onFulfilled, onRejected) {
    fs.readdir(dir, function(err, files) {
      err ? onRejected(err): onFulfilled(files);
    });
  });
}
// dir1 と dir2 のファイルリストを比較する Promise 処理
function compareFilesPromise(dir1, dir2) {
  return new Promise(function(onFulfilled, onRejected) {
    Promise.all([
      readDirPromise(dir1),
      readDirPromise(dir2)
    ]).then(
      function(x) { // contains file lists in two directory
        var check = (x[0].length === x[1].length) &&
                    (x[0].sort().join() === x[1].sort().join());
        check ? onFulfilled(x[0]): onRejected(new Error('file lists are inconsist.'));
      },
      function(err) { onRejected(err);}
    );
  });
}
// ファイルの sha1 ハッシュを取得する Promise処理
function hashPromise(filename) {
  return new Promise(function(onFulfilled, onRejected) {
    var hash = crypto.createHash('sha1');
    var s = fs.createReadStream(filename);
    s.pipe(hash);
    s.on('error', function(err) {
      onRejected(err); // Hash calclation error
    });
    s.on('end', function() {
      onFulfilled(hash.read().toString('hex'));
    });
  });
}
// dir1 と dir2 中のファイルのハッシュ値を比較する Promise処理
function compareHashPromise(dir1, dir2, file) {
  return new Promise(function(onFulfilled, onRejected) {
    Promise.all([
      hashPromise(dir1 + file),
      hashPromise(dir2 + file)
    ]).then(
      function(x) { // contains has hash data list of two files
        x[0] === x[1] ? onFulfilled(x[0]): onRejected(new Error('hashes are inconsistent.'));
      },
      function(err) {
        onRejected(err);
      }
    );
  });
}
// dir1 と dir2 中の全てのファイルのハッシュ値を比較する Promise処理
function compareDirHash(dir1, dir2) {
  return compareFilesPromise(dir1, dir2).then(function(files) {
    return Promise.all(files.map(function(f) { // ハッシュ比較のPromiseオブジェクトを配列にして Promise.all に渡す
      return compareHashPromise(dir1, dir2, f);
    }));
  }).then(
    function(x) { // contains hash list of all files
      console.log('Success:', dir1, dir2, 'Hashes of all files are equal.');
    },
    function(err) { // all case of errors
      console.log('Error:', dir1, dir2, err.message);
    }
  );
}
// テストの実行 dir1 に9個のテキストファイル(hoge[1-9].txt)を格納
compareDirHash('./dir1/', './dir2/'); // dir1とdir2は全部一致
compareDirHash('./dir1/', './dir3/'); // dir3で1つファイルを削除
compareDirHash('./dir1/', './dir4/'); // dir4で1つファイルを変更 
compareDirHash('./dir1/', './not_exist_dir/'); // 存在していないディレクトリを指定

実行結果です。エラー処理もちゃんと動いてうれしい。

$ node --harmony promise_sample3.js
Error: ./dir1/ ./dir3/ file lists are inconsist.
Error: ./dir1/ ./not_exist_dir/ ENOENT, readdir './not_exist_dir/'
Success: ./dir1/ ./dir2/ Hashes of all files are equal.
Error: ./dir1/ ./dir4/ hashes are inconsistent.

5. Node の Stream と Promise を組み合わせて利用する

実はちょっと前に @azu_re さんよりStreamの宿題をいただいており、その中に「StreamとPromise」がテーマとして挙げられていました。初っ端から大物テーマですが、タイミングがいいので取り組んでみます。
・・・
とはいうものの、なかなかこれだというのが思いつかないです。

Generator と同じく Promise を使いこなすには、かなり頭の体操が必要です。でもあまり時間もかけてられないので、ボツネタ2個と採用ネタ1つで許してください。
誰か Stream と Promise の素晴らしい使い方を思いついたら教えて欲しいです。

5.1 Streams API 風に使う(ボツネタ)

WHATWG/W3Cで現在仕様化が進められている Streams API を参考にしてみます。
これは Promise かつ Stream の両方を兼ね備えたナイスなAPIです。この仕様の example に書いてあるよう、streamデータの読み込みを Promiseを使って再帰的に処理することが可能です。

function ReadStream() {
  stream.read().then(
    function(data) {
       data処理 ...
       ReadStream();
    },
    function(err) {
       エラー処理 ...
    }
  );
}
ReadStream();

実際にサンプルコードを書いたのですが、Nodeでは readable イベントや data イベントでデータ読み込み処理ができるので、こういう方法はあまりメリットがないかなぁと思い、ボツにしました。

5.2 Streamのエラー処理に(ボツネタ)

Promise は catch で個別にエラーハンドリングができるので、これ使って

  stream1.catch(function(err){...}).pipe(stream2).catch(function(err){...}).pipe(stream3) ...

な感じに書けるのがいいんじゃないかと。でも on('error', function(err){}) と同じなのでボツ。

5.3 複数の Piped Stream 処理の結合(採用ネタ)

やっぱりこれでしょ。複数同時に pipe でつないだストリームをガッチャンコするために Promise を使うやり方です。

var pipe1 = stream11.pipe(stream12).pipe(stream13);
var pipe2 = stream21.pipe(stream22).pipe(stream23);
Promise.all([pipe1, pipe2]).then(
  function(result) {
    // 結合時の成功処理
  },
  function(err) {
    // エラー処理
  }
);

な感じに書ければ最高です。endイベントに合わせて Promise 状態をFulfillにしてやればいいでしょう。

以下サンプルコードです。長いので興味のある方以外は読み飛ばして結構です。

サンプルコードでは、簡単のためストリームソースとして object stream を扱います。プロパティ値を1倍、10倍、100倍にするtransform stream と1足す、10足す、100足すtransform stream を別々に処理して最後に値を合わせます。
作る際にちょっと苦労した点は以下の3点です。

  1. TransformストリームとPromiseを多重継承したオブジェクトは作れないのでpromiseを内部に埋め込んだ。(mix-inも試しましたがコンストラクタが instance of をチェックしているので断念しました)
  2. error は pipe で伝搬しないので、独自に promise_err イベントを作って、最後のストリームまで伝搬させました。(5.2みたいに個別 catchもありですけど)
  3. stream はデータを取り出してやらないと end しないので、 consume() メソッドを最後につけました。
var util = require('util');
var Transform = require('stream').Transform;

// Base Class of Transform which contains promise
function TransformPromise(manupilate, options) {
  Transform.call(this, options);
  this._writableState.objectMode = true;
  this._readableState.objectMode = true;
  this.bufs = []; // all filtered data stored
  var self = this;
  // for propagation promise_err to pipe end
  this.on('pipe', function(src) {
    src.on('promise_err', function(err) {
      self.emit('promise_err', err);
    });
  });
  // promise fullfilled in 'end' and rejected in 'promise error'
  this.promise = new Promise(function(onFulfilled, onRejected) {
    self.on('end', function() {
      onFulfilled(self.bufs); // return all data
    });
    self.on('promise_err', function(err) {
      onRejected(err);
    });
  });
}
util.inherits(TransformPromise, Transform);

// data consuming to store data and emit 'end' event
TransformPromise.prototype.consume = function() {
  this.on('readable', function() {
    var b = this.read();
    if (b) this.bufs.push(b);
  });
  return this;
};

// all values in object stream are multiplied
function Multiply(multiple, options) {
  TransformPromise.call(this, options);
  this.multiple = multiple || 1;
}
util.inherits(Multiply, TransformPromise);
Multiply.prototype._transform = function(chunk, encoding, cb) {
  var out =  {};
  for(var key in chunk) {
    if (!checkProperty(this, chunk[key])) break;
    out[key] = this.multiple * chunk[key];
  }
  this.push(out);
  cb();
};

// all values in object stream are incremented
function Plus(plus, options) {
  TransformPromise.call(this, options);
  this.plus = plus || 0;
}
util.inherits(Plus, TransformPromise);
Plus.prototype._transform = function(chunk, encoding, cb) {
  var out = {};
  for(var key in chunk) {
    if (!checkProperty(this, chunk[key])) break;
    out[key] = chunk[key] + this.plus;
  }
  this.push(out);
  cb();
};

var multi1 = new Multiply(1);     // *1 filter
var multi10 = new Multiply(10);   // *10 filter
var multi100 = new Multiply(100); // *100 fiter

var plus1 = new Plus(1);          // +1 filter
var plus10 = new Plus(10);        // +10 filter
var plus100 = new Plus(100);      // +100 filter

// filtered to vaule*1*10*100
var pipe1 = multi1.pipe(multi10).pipe(multi100).consume();
// filtered to value+1+10+100
var pipe2 = plus1.pipe(plus10).pipe(plus100).consume();

// two streams meets when both are fulfilled
Promise.all([pipe1.promise, pipe2.promise]).then(function(x) {
  var result = concatArrayElm(x[0], x[1]);
  console.log(result);
}).catch(function(err) {
  // catch all promise_errors
  console.log('Error:', err.message);
});

// data source of object stream
var source = [{a:1, b:2, c:3}, {d:4, e:5}, {f:6}];
// var source = [{a:1, b:2, c:3}, {d:4, e:'5'}, {f:6}]; // for error test

// write data to two streams;
source.forEach(function(elm) {
  multi1.write(elm);
  plus1.write(elm);
});

// finish writing
multi1.end();
plus1.end();

// check the property values if they are number
function checkProperty(self, value) {
  if(typeof value === 'number') {
    return true;
  } else {
    self.emit('promise_err', new Error(value + ' is not a number.'));
    return false;
  }
}

// final result is concated each values in two streams
function concatArrayElm(ar1, ar2) {
  if (ar1.length !== ar2.length) {
    throw new Error('source length is inconsistent.');
    return;
  }
  var ret = [];
  for(var i = 0; i < ar1.length; i++) {
    var obj = {};
    for(var key in ar1[i]) {
      obj[key] = ar1[i][key] + ar2[i][key];
    }
    ret.push(obj);
  }
  return ret;
}

で、実行結果。

# 正常時
$ node --harmony promise_sample5.js
[ { a: 1112, b: 2113, c: 3114 },
  { d: 4115, e: 5116 },
  { f: 6117 } ]
# エラー時
$ node --harmony promise_sample5.js
Error: 5 is not a number.

Generator とも組み合わせて考えるともっと面白いことができるかもしれません。今後に期待です。