Страницы

воскресенье, 21 декабря 2014 г.

Node.js Stream Error Handling

Обработка ошибок в объектах, реализующих интерфейс Stream в Node.js - задача нетривиальная. Прежде чем мне случилось "по-взрослому" поработать с потоками я полагал, что событие error останавливает процесс передачи данных. Оказалось что это не так. Приведу несколько примеров, демонстрирующих это, в моем случае неожиданное, поведение. Заодно рассмотрим кастомную реализацию некоторых типов интерфейса Stream.

Process.

Используем process.stdin и process.stdout: попробуем передать данные "по конвейеру", для чего напишем простенькое приложение, которое будет принимать в качестве аргумента строку, содержащую число и на выходе добавлять к полученному аргументу декремент числа через запятую с пробелом.
stream.js:
process.stdin.on('readable', function() {
  var buffer = process.stdin.read();
  if (buffer === null) return;
  var s = buffer.toString().trim(), arr = s.split(''), d = arr[arr.length - 1];      
  process.stdout.write(s + ', ' + (d - 1));      
});

Отправим в приложение строку "result: 3" и пропустим ее через наш "конвейер" несколько раз:

Теперь попробуем выкинуть ошибку в каждой итерации "конвейера", как в stdin, так и в stdout.
stream.js:
process.stdin.on('readable', function() {
  var buffer = process.stdin.read();
  if (buffer === null) return;
  var s = buffer.toString().trim(), arr = s.split(''), d = arr[arr.length - 1];
  setTimeout(function() {
    process.stdout.write(s + ', ' + (d - 1));
  }, 500);
  setTimeout(function() {
    process.stdin.emit('error', new Error('stdin error'));
  }, 200);
  setTimeout(function() {
    process.stdout.emit('error', new Error('stdout error'));
  }, 300);
});

process.stdin.on('error', function(err) {
  console.error(process.pid + ' stdin error: ' + err);
});
process.stdout.on('error', function(err) {
  console.error(process.pid + ' stdout error: ' + err);
});

И ничего не случилось - данные все равно "доползли" до заключительной серии:

Для тех кто в танке :) обозначу еще раз: я то думал (уверен что не только я один), что в результате первой же ошибки процесс передачи данных будет остановлен.

В этом случае решает process.exit().
stream.js:
process.stdin.on('readable', function() {
  var buffer = process.stdin.read();
  if (buffer === null) return;
  var s = buffer.toString().trim(), arr = s.split(''), d = arr[arr.length - 1];
  setTimeout(function() {
    process.stdout.write(s + ', ' + (d - 1));
  }, 500);
  setTimeout(function() {
    process.stdin.emit('error', new Error('stdin error'));
  }, 200);
  setTimeout(function() {
    process.stdout.emit('error', new Error('stdout error'));
    process.exit(1);
  }, 300);
});

process.stdin.on('error', function(err) {
  console.error(process.pid + ' stdin error: ' + err);  
});
process.stdout.on('error', function(err) {
  console.error(process.pid + ' stdout error: ' + err);
});


Writable.

Реализуем writable интерфейс.
writable.js:
var writable = require('stream').Writable();

var i = 4, arr = [];

writable._write = function(chunk, enc, next) {
  console.log('chunk: %s', chunk);
  arr.push(chunk);
  next();
};

writable.on('finish', function() {
  console.log('result: %s', arr.join(', '));
});

(function go() {
  if (!i--) return writable.end();
  setTimeout(function() {
    writable.write(i.toString());
    go();
  }, 500);
})();


Попробуем выкинуть ошибку.
writable.js:
var writable = require('stream').Writable();

var i = 4, arr = [];

writable._write = function(chunk, enc, next) {
  console.log('chunk: %s', chunk);
  arr.push(chunk);
  next();
};

writable.on('finish', function() {
  console.log('result: %s', arr.join(', '));
});

writable.on('error', function(err) {
  console.log('err:', err);
});

(function go() {
  if (!i--) return writable.end();
  setTimeout(function() {
    writable.write(i.toString());
    go();
  }, 500);
})();

setTimeout(function() {
  writable.emit('error', new Error('Bla-bla-bla'));
}, 1200);

Результат не изменился.

Завершим writable-поток "вручную" - this.end(). При этом перед тем как отправлять в поток очередную порцию данных желательно проверить его состояние writable._writableState.ended.
writable.js:
var writable = require('stream').Writable();

var i = 4, arr = [];

writable._write = function(chunk, enc, next) {
  console.log('chunk: %s', chunk);
  arr.push(chunk);
  next();
};

writable.on('finish', function() {
  console.log('result: %s', arr.join(', '));
});

writable.on('error', function(err) {
  console.log('err:', err);  
  this.end();  
});

(function go() {  
  if (!i--) return writable.end();
  setTimeout(function() {
    if (writable._writableState.ended) return;
    writable.write(i.toString());
    go();
  }, 500);
})();

setTimeout(function() {
  writable.emit('error', new Error('Bla-bla-bla'));
}, 1200);



Readable.

Реализуем readable интерфейс.
readable.js:
var readable = require('stream').Readable();

var i = 4, arr = [];

readable._read = function() {
  if (!i--) return this.push(null);
  setTimeout(function() {
    arr.push(i);
    readable.push('chunk: ' + i + '\n');
  }, 500);
};

readable.on('end', function() {
  console.log('result: %s', arr.join(', '));
});

readable.pipe(process.stdout);


Попробуем выкинуть ошибку.
readable.js:
var readable = require('stream').Readable();

var i = 4, arr = [];

readable._read = function() {
  if (!i--) return this.push(null);
  setTimeout(function() {
    arr.push(i);
    readable.push('chunk: ' + i + '\n');
  }, 500);
};

readable.on('end', function() {
  console.log('result: %s', arr.join(', '));
});

readable.on('error', function(err) {
  console.log('error:', err);
});

readable.pipe(process.stdout);

setTimeout(function() {
  readable.emit('error', new Error('Bla-bla-bla'));
}, 1200);

Результат не изменился.

Завершим readable-поток "вручную" - this.push(null). При этом перед тем как отправлять в поток очередную порцию данных желательно проверить его состояние readable._readableState.ended.
readable.js:
var readable = require('stream').Readable();

var i = 4, arr = [];

readable._read = function() {  
  if (!i--) return this.push(null);
  setTimeout(function() {
    if (readable._readableState.ended) return;
    arr.push(i);
    readable.push('chunk: ' + i + '\n');
  }, 500);
};

readable.on('end', function() {
  console.log('result: %s', arr.join(', '));
});

readable.on('error', function(err) {
  console.log('error:', err);  
  this.push(null);  
});

readable.pipe(process.stdout);

setTimeout(function() {
  readable.emit('error', new Error('Bla-bla-bla'));
}, 1200);


Transform.

Реализуем transform интерфейс.
transform.js:
var transform = require('stream').Transform();

var i = 4, arr = [];

transform._transform = function(chunk, enc, next) {
  this.push('chunk: ' + chunk + '\n');
  arr.push(chunk + '0');
  next();
};

transform.on('finish', function() {
  console.log('result: %s', arr.join(', '));
});

transform.pipe(process.stdout);

(function go() {
  if (!i--) return transform.end();
  setTimeout(function() {
    transform.write(i.toString());
    go();
  }, 500);
})();


Попробуем выкинуть ошибку.
transform.js:
var transform = require('stream').Transform();

var i = 4, arr = [];

transform._transform = function(chunk, enc, next) {
  this.push('chunk: ' + chunk + '\n');
  arr.push(chunk + '0');
  next();
};

transform.on('finish', function() {
  console.log('result: %s', arr.join(', '));
});

transform.on('error', function(err) {
  console.log('error:', err);
});

transform.pipe(process.stdout);

(function go() {
  if (!i--) return transform.end();
  setTimeout(function() {
    transform.write(i.toString());
    go();
  }, 500);
})();

setTimeout(function() {
  transform.emit('error', new Error('Bla-bla-bla'));
}, 1200);

И снова результат не меняется.

Завершим transform-поток "вручную" - this.end(). Кроме того у transform-потока есть как _readableState, так и _writableState. Я думаю что в нашем случае будет "правильнее" проверить его состояние transform._writableState.ended перед тем тем как отправлять в него очередную порцию данных.
transform.js:
var transform = require('stream').Transform();

var i = 4, arr = [];

transform._transform = function(chunk, enc, next) {
  this.push('chunk: ' + chunk + '\n');
  arr.push(chunk.toString() + '0');
  next();
};

transform.on('finish', function() {
  console.log('result: %s', arr.join(', '));
});

transform.on('error', function(err) {
  console.log('error:', err);  
  this.end();
});

transform.pipe(process.stdout);

(function go() {
  if (!i--) return transform.end();
  setTimeout(function() {
    if (transform._writableState.ended) return;    
    transform.write(i.toString());
    go();
  }, 500);
})();

setTimeout(function() {
  transform.emit('error', new Error('Bla-bla-bla'));
}, 1200);


Pipe.

Соберем readabletransform и process.stdout в один конвейер.
pipe.js:
var i = 4, arr = [];
// readable
var readable = require('stream').Readable();

readable._read = function() {
  if (!i--) return this.push(null);
  setTimeout(function() {
    if (readable._readableState.ended) return;
    readable.push(i.toString());
  }, 500);
};

readable.on('end', function() {
  console.log('readable end');
});

// transform
var transform = require('stream').Transform();

transform._transform = function(chunk, enc, next) {
  this.push('chunk: ' + chunk + '\n');
  arr.push(chunk + '0');
  next();
};

transform.on('end', function() {
  console.log('transform end');
  console.log('result: %s', arr.join(', '));
});

transform.on('finish', function() {
  console.log('transform finish');
});

// pipe
readable.pipe(transform).pipe(process.stdout);


Выкидываем ошибку в readable-потоке.
pipe.js:
var i = 4, arr = [];
// readable
var readable = require('stream').Readable();

readable._read = function() {
  if (!i--) return this.push(null);
  setTimeout(function() {
    if (readable._readableState.ended) return;
    readable.push(i.toString());
  }, 500);
};

readable.on('end', function() {
  console.log('readable end');
});

// transform
var transform = require('stream').Transform();

transform._transform = function(chunk, enc, next) {
  this.push('chunk: ' + chunk + '\n');
  arr.push(chunk + '0');
  next();
};

transform.on('end', function() {
  console.log('transform end');
  console.log('result: %s', arr.join(', '));
});

transform.on('finish', function() {
  console.log('transform finish');
});

// pipe
readable.pipe(transform).pipe(process.stdout);

readable.on('error', function(err) {
  console.log('readable error:', err);  
  this.push(null);  
});

transform.on('error', function(err) {
  console.log('transform error:', err);  
  this.end();
});

setTimeout(function() {
  readable.emit('error', new Error('Bla-bla-bla'));
}, 1200);


Обращаю внимание на то, что transform-поток ничего не знает об ошибке.

Выкидываем ошибку в transform-потоке.
pipe.js:
var i = 4, arr = [];
// readable
var readable = require('stream').Readable();

readable._read = function() {
  if (!i--) return this.push(null);
  setTimeout(function() {
    if (readable._readableState.ended) return;
    readable.push(i.toString());
  }, 500);
};

readable.on('end', function() {
  console.log('readable end');
});

// transform
var transform = require('stream').Transform();

transform._transform = function(chunk, enc, next) {
  this.push('chunk: ' + chunk + '\n');
  arr.push(chunk + '0');
  next();
};

transform.on('end', function() {
  console.log('transform end');
  console.log('result: %s', arr.join(', '));
});

transform.on('finish', function() {
  console.log('transform finish');
});

// pipe
readable.pipe(transform).pipe(process.stdout);

readable.on('error', function(err) {
  console.log('readable error:', err);
  this.push(null);
});

transform.on('error', function(err) {
  console.log('transform error:', err);
  this.end();
});

setTimeout(function() {
  transform.emit('error', new Error('Bla-bla-bla'));
}, 1200);


Теперь readable-поток не в курсе что его "отцепили", даже его событие end не сыграло, но тем не менее приложение в целом отработало валидно.

Заключение.

Несмотря на некоторые "вольности", которые допустимы при работе с объектами, реализующими интерфейс Stream, не забываем обрабатывать ошибки в каждом из них, хотя бы тупо выводом в консоль, иначе будет... печалька :).