Страницы

воскресенье, 31 августа 2014 г.

MongoDB Tailable Cursors

В повседневной работе я нередко использую непереводимую игру слов с использованием местных идиоматических выражений. Как к примеру сказать по-русски "_id"? Я так и говорю: "underscore id". Сегодня расскажу про "tailable cursors" - курсоры, которые благодаря тому, что не закрываются сразу же после получения заключительной "порции" данных, отвечающих поисковому запросу, как это просходит с "обычными" курсорами, продолжают получать документы сразу же после их добавления в целевую коллекцию. Их можно применять только по отношению к "capped collections".

Для начала предлагаю создать эту самую "ограниченную" коллекцию, я назвал ее "messages" так как планирую хранить в ней сообщения простенького чата, который должен получиться в итоге:

Теперь пишем код приложения, которое будет выводить в консоль свежие документы по мере их поступления в коллекцию, а заодно и добавлять документы с целью тестирования работы "tailable" курсора:
var Db = require('mongodb').Db;
var Server = require('mongodb').Server;
var config = {db: 'test', host: 'localhost', port: 27017, collection: 'messages'};
var db = new Db(config.db, new Server(config.host, config.port, { auto_reconnect: true }), {w: 'majority', safe: true});
db.open(function(err) {
  if (err) throw err;
  console.log('MongoDB connected to db %s on %s:%d', config.db, config.host, config.port);
  db.collection(config.collection, function(err, collection) {
    if (err) throw err;
    collection.isCapped(function(err, capped) {
      if (err) throw err;
      if (!capped) throw new Error('Collection ' + config.collection + ' is not capped');
      var latest = collection.find({}).sort({ $natural: -1 }).limit(1);
      latest.nextObject(function(err, item) {
        if (err) throw err;
        if (item) return gogogo(collection, item._id);
        console.log('Inserting first document...');
        collection.insert({a: 0}, function(err, items) {
          if (err) throw err;
          gogogo(collection, items[0]._id);
        });
      });
    });
  });
});

function gogogo(collection, oid) {
  console.log('Start tailing on %s collection...', config.collection);
  var query = { _id: { $gt: oid }};
  var options = { tailable: true, awaitdata: true, numberOfRetries: -1, tailableRetryInterval: 200 };
  var cursor = collection.find(query, options).sort({ $natural: 1 });
  (function next() {
    cursor.nextObject(function(err, item) {
      if (err) throw err;
      console.log(item);
      next();
    });
  })();
  // тест
  var i = 1;
  (function test() {
    setTimeout(function() {
      collection.insert({a: i}, function(err) {
        if (err) throw err;
        i += 1;
        test();
      });
    }, 5000);
  })();
}

Запускаем приложение, через 15 секунд наблюдаем следующую картину:

Для того, чтобы курсор "зацепился" за коллекцию необходимо чтобы самый первый, я бы сказал "initial" запрос - latest.nextObject... -  вернул документ, что предусмотрено в коде и собственно почему у нас в консоли появилось сообщение Inserting first document..., после чего мы вставили в коллекцию документ {a: 0}.

Итак, код отработал как надо, курсор отловил свежие документы информация о которых благополучно отобразилась в консоли.

Переходим к созданию простенького чата. За основу я взял код чата на TCP-сокетах - проще некуда:
- server.js:
var net = require('net');
var pubsub = require('./pubsub');

var server = net.createServer(function(socket) {
  socket.setEncoding('utf8');
  console.log('--- socket connected ---\nfrom: %s', socket.remoteAddress + ':' + socket.remotePort);  
  pubsub.emit('join', socket);  
  
  socket.on('data', function(data) {
    data = data.replace(/^\s+|\s+$|\r\n/g, '');
    if (!data) return this.write('\033[1AEmpty message!');
    console.log('--- socket data ---\n%s', data);
    pubsub.emit('broadcast', this._id, data);    
  });

  socket.on('close', function() {
    console.log('--- socket closed ---');
    pubsub.emit('leave', this);
  });

  socket.on('end', function() {
    console.log('--- socket end ---');   
    pubsub.emit('leave', this);
  });
  
  socket.on('error', function(e) {
    console.log('--- server error ---\ncode: %s', e.code);
  });
});
server.listen(8124, function() {
  console.log('Chamber of Secrets is opened on port %d...', this.address()['port']);  
});

- client.js:
var net = require('net');

var socket = new net.Socket();
socket.setEncoding('utf8');

socket.connect('8124', 'localhost', function() {
  console.log('--- connected to server ---');  
});

process.stdin.resume();

process.stdin.on('data', function(data) {
  socket.write(data);
});

socket.on('data', function(data) {
  console.log(data);
});

socket.on('close', function() {
  console.log('--- connection closed ---');
  process.exit();
});

socket.on('end', function() {
  console.log('--- connection end ---');
});

socket.on('error', function(e) {
  console.log('--- socket error ---\ncode: %s', e.code);
});

- pubsub.js:
var events = require('events');
var pubsub = new events.EventEmitter();
pubsub.clients = {};
pubsub.subscriptions = {};

pubsub.on('join', function(socket) {
  socket['_id'] = socket.remoteAddress + ':' + socket.remotePort;    
  this.clients[socket._id] = socket;

  this.subscriptions[socket._id] = function(socket_id, data) {
    data = new Date().toLocaleTimeString() + '  ' + socket_id + ' >>> ' + data;
    if (socket._id === socket_id) data = '\033[1A' + data;
    this.clients[socket._id].write(data);
  }
  this.on('broadcast', this.subscriptions[socket._id]);

  console.log('--- socket saved ---\nusers online: %d', this.listeners('broadcast').length);
  socket.write('Welcome to Chamber of Secrets!');
});

pubsub.on('leave', function(socket) {
  delete pubsub.clients[socket._id];
  this.removeListener('broadcast', this.subscriptions[socket._id]);    
  socket.destroy();
  console.log('--- socket destroyed ---\nusers online: %d', this.listeners('broadcast').length);
});

pubsub.on('error', function(e) {
  console.log('--- pubsub error ---\n%s', e.message);
});

module.exports = pubsub;

Запускаем сервер, пару клиентов, клиенты приветствуют друг друга, после чего консоль сервера выглядит как-то так:

Клиенты в свою очередь могут созерцать сообщения чата:

На текущий момент сообщения вообще никак не хранятся.

Создадим модуль по имени db на основе нашего приложения app.js:
- db.js:
var pubsub = require('./pubsub');

var Db = require('mongodb').Db;
var Server = require('mongodb').Server;
var config = {db: 'test', host: 'localhost', port: 27017, collection: 'messages'};
var db = new Db(config.db, new Server(config.host, config.port, { auto_reconnect: true }), {w: 'majority', safe: true});
db.open(function(err) {
  if (err) throw err;
  console.log('MongoDB connected to db %s on %s:%d', config.db, config.host, config.port);
  db.collection(config.collection, function(err, collection) {
    if (err) throw err;
    collection.isCapped(function(err, capped) {
      if (err) throw err;
      if (!capped) throw new Error('Collection ' + config.collection + ' is not capped');
      var latest = collection.find({}).sort({ $natural: -1 }).limit(1);
      latest.nextObject(function(err, item) {
        if (err) throw err;
        if (item) return gogogo(collection, item._id);
        console.log('Inserting first document...');
        collection.insert({a: 0}, function(err, items) {
          if (err) throw err;
          gogogo(collection, items[0]._id);
        });
      });
    });
  });
});

function gogogo(collection, oid) {
  console.log('Start tailing on %s collection...', config.collection);
  var query = { _id: { $gt: oid }};
  var options = { tailable: true, awaitdata: true, numberOfRetries: -1, tailableRetryInterval: 200 };
  var cursor = collection.find(query, options).sort({ $natural: 1 });
  (function next() {
    cursor.nextObject(function(err, item) {
      if (err) throw err;
      pubsub.emit('broadcast', item.from, item.message);
      next();
    });
  })();
}

module.exports.insert = function(client_id, message, cb) {
  db.collection(config.collection).insert({message: message, from: client_id}, cb);
};
module.exports.find = function(limit, cb) {
  db.collection(config.collection).find({}).sort({ $natural: -1 }).limit(limit).toArray(cb);
};

Теперь вместо вывода в консоль свежего документа код эмитит событие broadcast модуля pubsub. Кроме того созданный модуль экспортирует пару методов для создания и извлечения документов коллекции messages.

Редактируем код сервера:
- server.js:
var net = require('net');
var pubsub = require('./pubsub');
var db = require('./db');

var server = net.createServer(function(socket) {
  socket.setEncoding('utf8');
  console.log('--- socket connected ---\nfrom: %s', socket.remoteAddress + ':' + socket.remotePort);  
  
  pubsub.emit('join', socket, function() {
    db.find(3, function(err, items) {
      if (err) throw err;
      var str = '\n' + items.map(function(item) {
        return item._id.getTimestamp().toLocaleTimeString() + '  ' + item.from + ' >>> ' + item.message;
      }).reverse().join('\n');      
      socket.write(str);
    });
  });  
  
  socket.on('data', function(data) {
    data = data.replace(/^\s+|\s+$|\r\n/g, '');
    if (!data) return this.write('\033[1AEmpty message!');
    console.log('--- socket data ---\n%s', data);    
    db.insert(this._id, data, function(err) {
      if (err) throw (err);      
    });
  });

  socket.on('close', function() {
    console.log('--- socket closed ---');
    pubsub.emit('leave', this);
  });

  socket.on('end', function() {
    console.log('--- socket end ---');   
    pubsub.emit('leave', this);
  });
  
  socket.on('error', function(e) {
    console.log('--- server error ---\ncode: %s', e.code);
  });
});
server.listen(8124, function() {
  console.log('Chamber of Secrets is opened on port %d...', this.address()['port']);  
});

Отличия от первоначального варианта:
- сразу после присоединения к чату - pubsub.emit('join'...-  мы получаем последние три сообщения и отдаем их новому клиенту - db.find(...
-  вместо того, чтобы эмитить событие 'broadcast' - pubsub.emit('broadcast'...,  теперь мы просто добавляем новое сообщение - db.insert(..., а событие эмитится уже в модуле db - в функции с многозначительным названием gogogo :) - в тот самый момент, когда "tailable" курсор получает свежесозданный документ коллекции messages.

Изменения в модуле pubsub ограничиваются появлением функции обратного вызова в списке аргументов функции, выполняемой в ответ на событие подключения к чату нового клиента:
- pubsub.js:
var events = require('events');
var pubsub = new events.EventEmitter();
pubsub.clients = {};
pubsub.subscriptions = {};

pubsub.on('join', function(socket, cb) {
  socket['_id'] = socket.remoteAddress + ':' + socket.remotePort;    
  this.clients[socket._id] = socket;

  this.subscriptions[socket._id] = function(socket_id, data) {
    data = new Date().toLocaleTimeString() + '  ' + socket_id + ' >>> ' + data;
    if (socket._id === socket_id) data = '\033[1A' + data;
    this.clients[socket._id].write(data);
  }
  this.on('broadcast', this.subscriptions[socket._id]);

  console.log('--- socket saved ---\nusers online: %d', this.listeners('broadcast').length);
  socket.write('Welcome to Chamber of Secrets!');
  cb();
});

pubsub.on('leave', function(socket) {
  delete pubsub.clients[socket._id];
  this.removeListener('broadcast', this.subscriptions[socket._id]);    
  socket.destroy();
  console.log('--- socket destroyed ---\nusers online: %d', this.listeners('broadcast').length);
});

pubsub.on('error', function(e) {
  console.log('--- pubsub error ---\n%s', e.message);
});

module.exports = pubsub;

Запускаем сервер, подключаем первого клиента, пишем в чат несколько сообщений:

По понятным причинам первые три сообщения, которые получил клиент при подключении выглядят странно, так как три последних сообщения на тот момент - это тестовые документы коллекции messages, созданные в процессе тестирования приложения app.js в самом начале повествования и выглядят они следующим образом:

Подключаем второго клиента, пишем сообщение в чат:

Здесь уже все в елочку. На сервере в этот момент тоже все по фэн-шую:

Таким образом мы получили реализацию pub/sub системы используя "tailable" (не знаю как правильно перевести на русский язык это слово, да и не уверен что имеет смысл это делать) курсор.

Теперь можно масштабировать приложение "в ширину" - разворачивать несколько серверов, хотя вряд ли это имеет смысл в случае нашего чата :).