Страницы

понедельник, 22 февраля 2016 г.

Node.js Stream. Promise vs. Callback

Пайпинг потоков в Node.js - дело нехитрое. Главное не забывать слушать ошибки в каждом потоке. И на сегодня это единственный кейс, где я нахожу оправданным использование промисов вместо функций обратного вызова. Объясню почему на простом примере. Для начала напишем три кастомных конструктора, которые наследуют от ReadableTransform, Writable и работают в Object Mode:

const stream = require('stream');
const Readable = stream.Readable;
const Transform = stream.Transform;
const Writable = stream.Writable;
 
const InitStream = function () {
  this._obj = {};
  Readable.call(this, { objectMode: true });
};
InitStream.prototype = Object.create(Readable.prototype, {
  _read: {
    value: function () {
      this.push(this._obj);
      this.push(null);
    }
  }
});
 
const TransformStream = function (obj) {
  this._obj = obj;
  Transform.call(this, { objectMode: true });
};
TransformStream.prototype = Object.create(Transform.prototype, {
  _transform: {
    value: function (chunk, encoding, cb) {
      this.push(Object.assign(chunk, this._obj));
      cb();
    }
  }
});
 
const LogStream = function () {
  Writable.call(this, { objectMode: true });
};
LogStream.prototype = Object.create(Writable.prototype, {
  _write: {
    value: function (chunk, encoding, cb) {
      console.log(chunk);
      cb();
    }
  }
});

Создадим несколько потоков:
const init = new InitStream(); //  "вбрасываем" пустой объект
const a = new TransformStream({ a: 1 }); // добавляем свойство "a"
const b = new TransformStream({ b: 2 }); // добавляем свойство "b"
const c = new TransformStream({ c: 3 }); // добавляем свойство "c"
const log = new LogStream(); // выводим в консоль

Пайпинг потоков может выглядеть следующим образом:
const pipe = (init, a, b, c, log, cb) => {
  log.on('finish', cb);
  init.pipe(a);
  a.pipe(b);
  b.pipe(c);
  c.pipe(log);
};
 
pipe(init, a, b, c, log, (err) => {
  if (err) return console.error(err);
  console.log('finish');
});


Разумеется мы забыли приклеить каждому потоку функцию обработки ошибок, поэтому если в TransformStream выкинуть ошибку, будет печалька:
TransformStream.prototype = Object.create(Transform.prototype, {
  _transform: {
    value: function (chunk, encoding, cb) {
      this.push(Object.assign(chunk, this._obj));
      cb(new Error('My Error'));
      // cb();
    }
  }
});


Если просто приклеить каждому потоку коллбэк, то есть вероятность того, что он будет вызван несколько раз (в нашем примере наверняка), а это как правило не то, что нам нужно:
const pipe = (init, a, b, c, log, cb) => {
  init.on('error', cb);
  a.on('error', cb);
  b.on('error', cb);
  c.on('error', cb);
  log.on('error', cb);  
  log.on('finish', cb);
  init.pipe(a);
  a.pipe(b);
  b.pipe(c);
  c.pipe(log);
};


Поэтому обычно приходится оборачивать коллбэк как-то так:
const pipe = (init, a, b, c, log, cb) => {
  let called = false;
  const callback = (err) => {
    if (called) return;
    called = true;
    cb(err);
  };
  init.on('error', callback);
  a.on('error', callback);
  b.on('error', callback);
  c.on('error', callback);
  log.on('error', callback);  
  log.on('finish', callback);
  init.pipe(a);
  a.pipe(b);
  b.pipe(c);
  c.pipe(log);
};


Но если применить промисы, то можно с этим на париться, т.к. как известно они резолвятся или реджектятся один раз:
const pipe = (init, a, b, c, log) => new Promise((resolve, reject) => {
  init.on('error', reject);
  a.on('error', reject);
  b.on('error', reject);
  c.on('error', reject);
  log.on('error', reject);
  log.on('finish', resolve);
  init.pipe(a);
  a.pipe(b);
  b.pipe(c);
  c.pipe(log);
});
 
pipe(init, a, b, c, log)
  .then(() => console.log('finish'))
  .catch((err) => console.error(err));


Дисклеймер: не думаю, что на текущий момент времени даже "нативные" промисы в JavaScript могут соревноваться в производительности с "vanilla" кодом, в общем, как говорится, думайте сами, решайте сами.