Страницы

суббота, 9 декабря 2017 г.

Going to gRPC. Bidirectional streaming

В процессе проектирования одного приложения в какой-то момент я решил, что один сервис должен иметь возможность итерировать по коллекции, за которую отвечает другой. Не просто получать данные в виде списка или потока, а иметь возможность перечислять элементы коллекции по одному, а также прерывать процесс итерации. Не смотря на то, что в итоге ответственность за прерывание процесса досталась итератору, что позволило решению остаться в пределах парадигмы request-response, мне захотелось посмотреть как могло бы выглядеть решение на bidirectional streaming RPC.

Вот что у меня получилось на Node.js:

- server.js:
const grpc = require('grpc')
const path = require('path')
 
const root = path.join(__dirname, 'protos')
const proto = grpc.load({ root, file: 'test.proto' })
 
const max = process.argv[2] ? Number(process.argv[2]) : 10
const numbers = function * (value) {
  for (let i = 0; i < max; i++) {
    yield value + i
  }
}
 
const server = new grpc.Server()
server.addService(proto.test.Streamer.service, {
  stream: (call) => {
    let iterator
    call.on('data', (data) => {
      console.log('data:', data)
      if (data.cancel) {
        return call.end()
      }
      if (!iterator) {
        iterator = numbers(data.value)
      }
      setTimeout(() => {
        const { value, done } = iterator.next()
        if (done) {
          return call.end()
        }
        call.write(value)
      }, 100)
    })
    call.on('end', () => {
      call.end()
      console.log('end')
    })
    call.on('error', (err) => {
      throw err
    })
    console.log('call')
  }
})
server.bind('0.0.0.0:50051', grpc.ServerCredentials.createInsecure())
server.start()

- client.js:
const grpc = require('grpc')
const path = require('path')
 
const root = path.join(__dirname, 'protos')
const proto = grpc.load({ root, file: 'test.proto' })
 
const min = process.argv[2] ? Number(process.argv[2]) : 0
const interval = process.argv[2] ? Number(process.argv[3]) : 5
const max = min + interval
 
const client = new proto.test.Streamer('localhost:50051', grpc.credentials.createInsecure())
 
const call = client.stream()
call.on('data', (data) => {
  console.log('data:', data)
  if (data.value === max) {
    call.write({ cancel: true })
  } else {
    call.write({ cancel: false })
  }
})
call.on('end', () => {
  console.log('end')
})
call.on('error', (err) => {
  throw err
})
call.write({ value: min, cancel: false })
console.log('call')



Хозяйкам на заметку.