Страницы

суббота, 20 октября 2018 г.

Sending binary with Node.js via gRPC

Protocol Buffers - протокол сериализации структурированных данных. Нельзя просто взять и скомпилировать: rpc Send (stream bytes) returns (void);, нужно обязательно использовать структуры: rpc Send (stream File) returns (Empty);. В Node.js самый простой способ отправить файл, или любой другой Readable Stream - спайпить его с Writable Stream. Для того чтобы отправить файл через gRPC можно пребразовать поток бинарных данных в поток структурированных данных и наоборот с помощью Transform Stream, как-то так: на клиенте Readable Stream -> Transform Stream -> ClientReadableStream, на сервере ServerReadableStream -> Transform Stream -> Writable Stream.


- stream.js:
const { Transform, Writable } = require('stream')
const { createHash } = require('crypto')
 
class UpStream extends Transform {
  constructor () {
    super({ readableObjectMode: true })
  }
  _transform (chunk, _, cb) {
    this.push({ chunk: chunk })
    cb()
  }
}
 
class DownStream extends Transform {
  constructor () {
    super({ writableObjectMode: true })
  }
  _transform ({ chunk }, _, cb) {
    this.push(Buffer.from(chunk))
    cb()
  }
}
 
class HashStream extends Transform {
  constructor () {
    super()
    this.hash = createHash('sha1')
    this.digest = ''
  }
  _transform (chunk, _, cb) {
    this.hash.update(chunk)
    this.push(chunk)
    cb()
  }
  _flush (cb) {
    this.digest = this.hash.digest('hex')
    cb()
  }
}
 
class DataStream extends Writable {
  constructor () {
    super()
    this._data = []
  }
  _write (chunk, _, cb) {
    this._data.push(chunk)
    cb()
  }
  get data () {
    return Buffer.concat(this._data).toString()
  }
}
 
module.exports = {
  UpStream,
  DownStream,
  HashStream,
  DataStream
}

- server.js:
const grpc = require('grpc')
const protoLoader = require('@grpc/proto-loader')
const path = require('path')
const protoFileName = path.join(__dirname, 'protos/test.proto')
const packageDefinition = protoLoader.loadSync(protoFileName)
const grpcObject = grpc.loadPackageDefinition(packageDefinition)
const server = new grpc.Server()
 
const { DownStream, HashStream, DataStream } = require('./stream')
const send = (upStream, cb) => {
  const downStream = new DownStream()
  const hashStream = new HashStream()
  const dataStream = new DataStream()
  new Promise((resolve, reject) => {
    upStream.on('error', reject)
    downStream.on('error', reject)
    hashStream.on('error', reject)
    dataStream.on('error', reject)
    dataStream.on('finish', () => {
      console.log('digest:', hashStream.digest)
      resolve()
    })
    upStream.pipe(downStream).pipe(hashStream).pipe(dataStream)
  })
    .then(cb)
    .catch((err) => {
      console.error('err:', err)
      cb(err)
    })
}
 
server.addService(grpcObject.test.Sender.service, { send: send })
server.bind('0.0.0.0:50051', grpc.ServerCredentials.createInsecure())
server.start()

- client.js:
const grpc = require('grpc')
const protoLoader = require('@grpc/proto-loader')
const path = require('path')
const protoFileName = path.join(__dirname, 'protos/test.proto')
const packageDefinition = protoLoader.loadSync(protoFileName)
const grpcObject = grpc.loadPackageDefinition(packageDefinition)
const client = new grpcObject.test.Sender('localhost:50051', grpc.credentials.createInsecure())
 
const { UpStream, HashStream } = require('./stream')
const { createReadStream } = require('fs')
const send = (filename) => {
  return new Promise((resolve, reject) => {
    const fileStream = createReadStream(filename)
    const hashStream = new HashStream()
    const upStream = new UpStream()
    const callStream = client.send({}, (err) => {
      if (err) return reject(err)
      console.log('digest:', hashStream.digest)
      resolve()
    })
    fileStream.on('error', reject)
    hashStream.on('error', reject)
    upStream.on('error', reject)
    callStream.on('error', reject)
    fileStream.pipe(hashStream).pipe(upStream).pipe(callStream)
  })
}
send(process.argv[2] || __filename)
  .catch((err) => {
    console.error('err:', err)
  })

Для идентификации файла я решил использовать HashStream, который так же как UpStream и DownStream наследует от Transform Stream.


На сервере содержимое файла хранится в DataStream, который наследует от Writable Stream.