Страницы

пятница, 24 июля 2015 г.

Node.js. Scaling apps

Секрет создания больших приложений - не создавать большие приложения. В этом посте я хочу раскрыть рецепт приготовления масштабируемых приложений на архитектуре микросервисов на платформе Node.js с использованием MongoDB. Для примера приготовим какое-нибудь бесполезное веб-приложение. Особенность этого рецепта в том, что ничего кроме Node.js и MongoDB не потребуется.

Не знаю что получится в итоге, думаю здесь важен сам процесс, а не результат, поехали...

1. Cluster.

Для начала напишем простой http-сервер.
- app.js:
'use strict';
 
var host = process.env.HOST || '127.0.0.1';
var port = process.env.PORT || process.argv[2] || 3000;
 
var server = require('http').createServer(function(req, res) {
  console.log('Process <%s> is handling request', process.pid);
  res.end(process.pid.toString());
});
 
server.listen(port, host, function() {
  console.log('Server started at', server.address());
});
 
setTimeout(function() {
  throw new Error('Ooops');
}, 20000);

Запустим по одному процессу на каждое ядро процессора с помощью модуля cluster.
- cl.js:
'use strict';
 
var cluster = require('cluster');
 
if (cluster.isMaster) {
  start();
} else {
  require('./app');
}
 
cluster.on('online', onOnline);
cluster.on('exit', onExit);
 
function start() {
  var i = require('os').cpus().length;
  while (i--) {
    spawn();
  }
  process.on('SIGUSR2', onRestart);
  // setTimeout(onRestart, 10000);
}
 
function spawn(env) {
  return cluster.fork(env);
}
 
function onOnline(worker) {
  console.log('Worker %s online, pid: %s', worker.id, worker.process && worker.process.pid);
}
 
function onExit(worker, code, signal) {
  if (!worker.suicide && code !== 0) {
    console.log('Worker %s crashed, code: %s, signal: %s', worker.id, code, signal);
    spawn();
  }
}
 
function onRestart() {
  console.log('Restarting workers');
  var workers = Object.keys(cluster.workers);
  var i = workers.length;
  var timeout = process.env.WORKER_TIMEOUT || 1000;
  (function restart() {
    var worker = cluster.workers[workers[--i]];
    if (!worker) return;
    console.log('Restarting worker %s, pid: %s', worker.id, worker.process && worker.process.pid);
    setTimeout(function() {
      worker.kill();
      worker.once('exit', function() {
        spawn().once('online', restart);
      });
    }, timeout);
  })();
}


Через 10 секунд рестартуем каждый из процессов по очереди, таким образом обеспечив доступность 100% по ходу рестарта.
В Linux для этого выполняем kill -SIGUSR2 <PID>, где PID - ID мастер-процесса, узнать который можно выполнив следующую команду: ps ax | grep node.
В Windows этот фокус не пройдет, для теста раскомментируем строку setTimeout(onRestart, 10000);

Через 20 секунд каждый из процессов упадет с ошибкой, после чего успешно поднимется обратно, что и требовалось.

Еще одно замечание на предмет работы модуля cluster: если затестить в Windows, то мы всегда будем попадать на один и тот же процесс, и с этим ничего не поделаешь. В любом случае будем считать, что "боевой" сервер будет не под окнами.

2. Service Registry.

Развернем реплику MongoDB со следующим конфигом:
{
  _id: "x",
  members: [
    {_id: 0, host: "localhost:27001"},
    {_id: 1, host: "localhost:27002"},
    {_id: 2, host: "localhost:27003", arbiterOnly: true}
  ]
}

Напишем модуль регистрации сервисов.
- reg.js:
'use strict';
 
var mongodb = require('mongodb');
var ObjectID = mongodb.ObjectID;
var str = 'mongodb://localhost:27001,localhost:27002/test?replicaSet=x';
 
module.exports = function(opts, cb) {
  mongodb.connect(str, function(err, db) {
    if (err) return cb(err);
    process._id = new ObjectID();
    db.collection('services').insertOne({
      _id: process._id,
      service: opts.service || process.env.SERVICE || 'test',
      host: opts.host,
      port: opts.port,
      pid: process.pid
    }, function(err) {
      if (err) return cb(err);
      db.close();
      process.on('SIGINT', onExit.bind(null, 'SIGINT'));
      process.on('SIGTERM', onExit.bind(null, 'SIGTERM'));
      process.on('message', onMessage);
      process.on('uncaughtException', onError);
      cb();
    });
  });
};
 
function onExit(signal) {
  unregister(function(err) {
    if (err) console.error(err);
    console.log('Process %s exit, signal: %s', process.pid, signal);
    process.exit(0);
  });
}
 
function onMessage(message) {
  if (message === 'shutdown') {
    unregister(function(err) {
      if (err) console.error(err);
    });
  }
}
 
function onError(err) {
  console.error(err.stack);
  unregister(function(err) {
    if (err) console.error(err);
    process.exit(1);
  });
}
 
function unregister(cb) {
  mongodb.connect(str, function(err, db) {
    if (err) return cb(err);
    db.collection('services').deleteOne({_id: process._id}, function(err) {
      db.close();
      cb(err);
    });
  });
}

При запуске каждый сервис создает документ в коллекции services, перед завершением работы удаляет созданный документ.

Научим http-сервер регистрироваться в реестре сервисов.
- app.js:
'use strict';
 
var host = process.env.HOST || '127.0.0.1';
var port = process.env.PORT || process.argv[2] || 3000;
 
var server = require('http').createServer(function(req, res) {
  console.log('Process <%s> is handling request', process.pid);
  res.end(process.pid.toString());
});
 
require('./reg')({host: host, port: port}, function(err) {
  if (err) throw err;
  server.listen(port, host, function() {
    console.log('Server started at', server.address());
  });
});
 
setTimeout(function() {
  throw new Error('Ooops');
}, 20000);

Обновим модуль запуска кластера. Добавим мастер-процессу слушателя событий SIGINT и SIGTERM - функцию по имени onKill() - для того, чтобы сервисы успевали удалить запись о своем существовании из коллекции services перед тем как уйти в небытие. Кроме того в процессе рестарта рабочих процессов - onRestart() - модуль будет отправлять каждому рабочему процессу сообщение - worker.send('shutdown') - с той же целью.
- cl.js:
'use strict';
 
var cluster = require('cluster');
 
if (cluster.isMaster) {
  start();
} else {
  require('./app');
}
 
cluster.on('online', onOnline);
cluster.on('exit', onExit);
 
function start() {
  var i = require('os').cpus().length;
  while (i--) {
    spawn();
  }
  process.on('SIGINT', onKill);
  process.on('SIGTERM', onKill);
  process.on('SIGUSR2', onRestart);
  // setTimeout(onRestart, 10000);
}
 
function spawn(env) {
  return cluster.fork(env);
}
 
function onOnline(worker) {
  console.log('Worker %s online, pid: %s', worker.id, worker.process && worker.process.pid);
}
 
function onExit(worker, code, signal) {
  if (!worker.suicide && code !== 0) {
    console.log('Worker %s crashed, code: %s, signal: %s', worker.id, code, signal);
    spawn();
  }
}
 
function onRestart() {
  console.log('Restarting workers');
  var workers = Object.keys(cluster.workers);
  var i = workers.length;
  var timeout = process.env.WORKER_TIMEOUT || 1000;
  (function restart() {
    var worker = cluster.workers[workers[--i]];
    if (!worker) return;
    console.log('Restarting worker %s, pid: %s', worker.id, worker.process && worker.process.pid);
    worker.send('shutdown');
    setTimeout(function() {
      worker.kill();
      worker.once('exit', function() {
        spawn().once('online', restart);
      });
    }, timeout);
  })();
}
 
function onKill() {
  setTimeout(function() {
    console.log('Bye');
    process.exit(0);
  }, process.env.WORKER_TIMEOUT || 1000);
}

Запускаем http-сервер, наблюдаем такую же картину, как и в прошлый раз - на скриншотах выше.
Сразу после запуска коллекция services выглядит примерно так:

После падений, рестартов и т.п. мы имеем все те же сервисы, только с иными _id и pid:

После завершения работы коллекция снова пустая.

3. Reverse Proxy.

Растащим приложение еще больше "в ширину" с помощью балансировщика.
- proxy.js:
'use strict';
 
var host = process.env.HOST || '127.0.0.1';
var port = process.env.PORT || process.argv[2] || 5000;
 
var express = require('express');
var app = express();
var logger = require('morgan');
 
var httpProxy = require('http-proxy');
 
var cache = {
  test: []
};
var services = {
  test: roundr(cache.test)
};
var onNextService = {i: insertService, d: deleteService};
 
require('./tail')(onNextService, function(err) {
  if (err) throw err;
  app.use(logger('dev'));
  app.get('/', testHandler);
  app.use(err404Handler);
  app.use(err500Handler);
  var server = require('http').createServer(app);
  server.listen(port, host, function() {
    console.log('Proxy server running at', server.address());
  });
});
 
function roundr(arr) {
  var i = Number.MAX_SAFE_INTEGER;
  return {
    next: function() {
      i = i - 1 || Number.MAX_SAFE_INTEGER;
      return arr[i%arr.length];
    },
    hasNext: function() {
      return !!arr.length;
    }
  };
}
 
function insertService(item) {
  if (item && item.service && cache[item.service] && item.host && item.port && !cache[item.service].some(serviceExists) && item._id) {
    console.log('Process <%s> is creating <%s> service proxy on <%s:%s>', process.pid, item.service, item.host, item.port);
    cache[item.service].push(createProxy({ host: item.host, port: item.port }, item._id.toString()));
  }
 
  function serviceExists(obj) {
    return obj.options && obj.options.target && obj.options.target.host === item.host && obj.options.target.port === item.port;
  }
}
 
function deleteService(item) {
  var _id = item._id && item._id.toString();
  if (_id) {
    Object.keys(cache).every(function(service) {
      return cache[service].every(removeService);
    });
  }
 
  function removeService(obj, i, arr) {
    if (obj._id === _id) {
      arr.splice(i, 1);
      return false;
    }
    return true;
  }
}
 
function createProxy(target, _id) {
  var proxy = httpProxy.createProxyServer({ target: target });
  proxy._id = _id;
  proxy.on('error', onError);
  return proxy;
}
 
function onError(err, req, res) {
  if (err.code === 'ECONNRESET') return req.connection.destroy();
  console.error(err);
  if (!res.headersSent) {
    res.writeHead(500);
    res.end();
  }
}
 
function testHandler(req, res, next) {
  if (services.test.hasNext()) return services.test.next().web(req, res);
  next(new Error('Service unavailable'));
}
 
function err404Handler(req, res, next) {
  var err = new Error('Not Found');
  err.status = 404;
  next(err);
}
 
function err500Handler(err, req, res, next) {
  res.status(err.status || 500).end(err.message);
}

В качестве веб-сервера балансировщик использует фреймворк express и отправляет все запросы по маршруту '/' существующим сервисам по алгоритму round-robin. Прочие маршруты и алгоритмы в случае необходимости реализуем самостоятельно.

Перед запуском балансировщик получает все документы коллекции services и создает для каждого сервиса прокси с помощью модуля http-proxy, а также начинает пасти появление новых сервисов и удаление существующих, используя модуль tail.
- tail.js:
'use strict';
 
var mongodb = require('mongodb');
var tail = require('mongodb-tail');
 
var local = 'mongodb://localhost:27001,localhost:27002/local?replicaSet=x';
var test = 'mongodb://localhost:27001,localhost:27002/test?replicaSet=x';
 
module.exports = function(onNextService, cb) {
  mongodb.connect(local, function(err, local) {
    if (err) return cb(err);
    tail(local.collection('oplog.rs'), {
      getCursor: function getCursor(latest) {
        return {
          ts: {$gt: latest.ts},
          fromMigrate: {$exists: false},
          ns: 'test.services',
          op: {$in: ['i', 'd']}
        };
      },
      onNextObject: function onNextObject(item, cb) {
        if (item.o) onNextService[item.op].call(null, item.o);
        cb();
      }
    }, function() {
      mongodb.connect(test, function(err, db) {
        if (err) return cb(err);
        db.collection('services').find({}, function(err, cursor) {
          if (err) return cb(err);
          cursor.each(function(err, item) {
            if (err) return cb(err);
            if (item) return onNextService.i(item);
            db.close();
            return cb();
          });
        });
      });
    });
    process.on('SIGINT', onExit.bind(null, 'SIGINT'));
    process.on('SIGTERM', onExit.bind(null, 'SIGTERM'));
    process.on('uncaughtException', onError);
 
    function onExit(signal) {
      setTimeout(function() {
        local.close();
        console.log('Process %s exit, signal: %s', process.pid, signal);
        process.exit(0);
      }, 0);
    }
 
    function onError(err) {
      console.error(err.stack);
      setTimeout(function() {
        local.close();
        process.exit(1);
      }, 0);
    }
  });
};

Для запуска балансировщика с помощью модуля cluster подмолодим код файла cl.js - добавим возможность запуска не только './app.js', но и любого другого приложения, отправленного вторым аргументом команды запуска кластера (первый аргумент уже занят - это порт).
- cl.js:
...
if (cluster.isMaster) {
  start();
} else {
  require(process.argv[3] || './app');
}
...

Запускаем кластер балансировщиков на порту 5000:

Открываем браузер, идем по адресу http://localhost:5000, на что балансировщик недвусмысленно дает нам понять что в настоящий момент у нас нет серверов способных обработать этот запрос:

Запускаем пару сервисов по имени test (они же http-серверы) на разных портах:


И наш балансировщик начинает раскидывать запросы по только что запущенным сервисам в порядке живой очереди:

Коллекция services в этот момент выглядит следующим образом:

That's all folks! В результате мы приготовили вполне себе масштабируемое приложение. Толку от него никакого, т.к. сервис по имени test не делает ничего кроме как отдает в ответ на запрос свой PID, что впрочем не мешает приложению в случае необходимости стать сколько угодно большим.