Страницы

воскресенье, 6 сентября 2015 г.

MongoDB && Elasticsearch Node.js package

Не знаю как в настоящий момент обстоят дела с поддержкой river plugins в Elasticsearch, а Node.js-модуль по имени mongodb-es-river уже готов оказать помощь в индексировании документов MongoDB в Elasticsearch, теперь с возможностью горизонтального масштабирования процесса "перетекания" данных. Далее расскажу как приготовить river-сервис на основе перечисленных выше ингредиентов.

Разворачиваем MongoDB Replica Set:

Запускаем Elasticsearch Cluster:

Задача - идексировать документы коллекции users:

Решение:
  - package.json:
{
  "name": "river-0.10.0",
  "version": "1.0.0",
  "description": "",
  "main": "app.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "author": "",
  "license": "ISC",
  "private": true,
  "dependencies": {
    "elasticsearch": "^8.0.1",
    "mongodb": "^2.0.42",
    "mongodb-es-river": "^0.10.0"
  }
}
  - app.js:
var mongodb = require('mongodb');
var local = 'mongodb://localhost:27001,localhost:27002/local?replicaSet=x';
var test = 'mongodb://localhost:27001,localhost:27002/test?replicaSet=x';

mongodb.connect(local, function(err, localDb) {
  if (err) throw err;
  mongodb.connect(test, function(err, testDb) {
    if (err) throw err;
    var settings = {
      oplog: localDb.collection('oplog.rs'),
      registry: testDb.collection('river'), // коллекция для хранения реестра сервисов
      client: require('elasticsearch').Client()
    };
    var options = [
      {
        index: 'test',
        type: 'users',
        collection: testDb.collection('users'), // целевая коллекция
        project: {name: 1, fullName: 1} // к примеру нас интересуют только поля name и fullName
      }
    ];
    require('mongodb-es-river')(settings, options, function(err) {
      if (err) throw err;
      console.log('All rivers run, pid:', process.pid);
    });
  });
});

Запускаем приложение:

Ищем юзеров:

Бинго.

Масштабируем "в ширину":
  - cluster.js:
var cluster = require('cluster');

if (cluster.isMaster) {
  start();
} else {
  require('./app');
}

function start() {
  var i = require('os').cpus().length;
  while (i--) {
    cluster.fork();
  }
  process.on('SIGINT', onKill);
  process.on('SIGTERM', onKill);
}

function onKill() {
  setTimeout(function() {
    process.exit(0);
  }, 100); // время, необходимое для удаления документа из коллекции river - реестра сервисов
}
Для теста выведем в консоль кое-какую информацию о работе приложения:
  - app.js:
var mongodb = require('mongodb');
var local = 'mongodb://localhost:27001,localhost:27002/local?replicaSet=x';
var test = 'mongodb://localhost:27001,localhost:27002/test?replicaSet=x';

mongodb.connect(local, function(err, localDb) {
  if (err) throw err;
  mongodb.connect(test, function(err, testDb) {
    if (err) throw err;
    var settings = {
      oplog: localDb.collection('oplog.rs'),
      registry: testDb.collection('river'), // коллекция для хранения реестра сервисов
      client: require('elasticsearch').Client(),
      worker: { // для теста
        value: {
          pid: process.pid,
        },
        onExit: function(signal) {
          console.log('Process %s exit, signal %s', process.pid, signal);
          process.exit(0);
        }
      }
    };
    var options = [
      {
        index: 'test',
        type: 'users',
        collection: testDb.collection('users'), // целевая коллекция
        project: {name: 1, fullName: 1}, // к примеру нас интересуют только поля name и fullName
        getObject: function(obj) { // убедимся в том, что процессы индексируют документы по-очереди
          console.log('pid:', process.pid);
          console.log('obj:', obj);
          return obj;
        },
        init: false // пропустим первоначальный экспорт документов
      }
    ];
    require('mongodb-es-river')(settings, options, function(err) {
      if (err) throw err;
      console.log('All rivers run, pid:', process.pid);
    });
  });
});
Запускаем кластер:

Каждый процесс зарегистрировался в реестре сервисов:

Обновляем fullName юзера:

В консоли приложения наблюдаем следующую картину:

Проверяем как поживает индекс:

Все в елочку.

Останавливаем кластер:

Реестр сервисов снова пуст:

Количество реестров и сервисов миксуем самостоятельно.

Как это было:

За микшерским пультом DJ Baur:
P.S.: Вариант хранения состояния сервиса на каждом из нодов сервиса, использованный в этом модуле, на мой взгляд не самый лучший. Для failover реализации river-сервиса с помощью этого модуля желательно запустить хотя бы пару кластеров на разных машинах.