Source: socket.js

const net = require('net');
const protobufjs = require('protobufjs');
const hexSha1 = require('hex-sha1');
const ProtoId = require('./protoid');
const Pb = require('./pb.json');


const ProtoName = {};
Object.keys(ProtoId).forEach((key) => {
  ProtoName[ProtoId[key]] = key;
});

let id = 1;

/**
 * Socket模块
 */

class Socket {
  /**
   * Creates an instance of Socket.
   * @param {string} ip OpenD服务Ip
   * @param {number} port OpenD服务端口
   * @param {object} logger 日志对象
   */
  constructor(ip, port, logger) {
    /**
     * OpenD服务IP
     * @type {string}
     */
    this.ip = ip;
    /**
     * OpenD服务端口
     * @type {number}
     */
    this.port = port;
    /**
     * 日志对象
     * @type {object}
     */
    this.logger = logger;

    id += 1;
    /**
     * socket id,自增,用于区分多个socket。
     * @type {number}
     */
    this.id = id;
    /**
     * socket名称,用于区分多个socket。
     * @type {string}
     */
    this.name = `Socket(${this.id})`;
    /**
     * socket是否已经连接
     * @type {boolean}
     */
    this.isConnect = false;
    /**
     * 请求序列号,自增
     * @type {number}
     */
    this.requestId = 1000; // 请求序列号,自增

    this.isHandStop = false;

    this.root = protobufjs.Root.fromJSON(Pb);

    this.cacheResponseCallback = {}; // 缓存的回调函数
    this.cacheNotifyCallback = {}; // 缓存的通知回调函数
    this.header = null; // 缓存接收的数据包头
    this.recvBuffer = Buffer.allocUnsafe(0); // 缓存接收的数据

    this.socket = new net.Socket();
    this.socket.setKeepAlive(true);
    this.socket.on('error', (data) => {
      this.logger.error(`${this.name} on error: ${data}`);
      this.socket.destroy();
      this.isConnect = false;
    });
    this.socket.on('timeout', (e) => {
      this.logger.error(`${this.name} on timeout.`, e);
      this.socket.destroy();
      this.isConnect = false;
    });
    // 为客户端添加“close”事件处理函数
    this.socket.on('close', () => {
      if (this.isHandStop) return;
      const errMsg = `${this.name} on closed and retry connect on 5 seconds.`;
      this.logger.error(errMsg);
      this.isConnect = false;
      // 5s后重连
      if (this.timerRecontent) return;
      this.timerRecontent = setTimeout(() => {
        this.init();
      }, 5000);
    });
    // 接收数据
    this.socket.on('data', (data) => {
      this.recvBuffer = Buffer.concat([this.recvBuffer, data]);
      this.parseData();
    });
  }
  async init() {
    if (this.isConnect) return;
    await this.connect();
  }
  /**
   * 立即建立连接
   */
  async connect() {
    this.isHandStop = false;
    return new Promise((resolve) => {
      if (this.timerRecontent) {
        clearTimeout(this.timerRecontent);
        this.timerRecontent = null;
      }
      this.socket.connect({
        port: this.port,
        host: this.ip,
        timeout: 1000 * 30,
      }, async () => {
        this.logger.debug(`${this.name} connect success:${this.ip}:${this.port}`);
        this.isConnect = true;
        if (typeof this.connectCallback === 'function') this.connectCallback();
        resolve();
      });
    });
  }
  async close() {
    this.socket.end();
    this.socket.destroy();
    this.isHandStop = true;
    this.logger.info('手动关闭 socket 。');
  }
  /**
   * 设置连接成功的回调函数
   *
   * @param {function} cb
   * @memberof Socket
   */
  onConnect(cb) {
    this.connectCallback = cb;
  }
  /**
   * 注册协议的通知
   *
   * @param {number} protoId 协议id
   * @param {function} callback 回调函数
   */
  subNotify(protoId, callback) {
    this.cacheNotifyCallback[protoId] = callback;
  }
  /**
   * 删除一个通知
   * @param {number} protoId 协议id
   */
  unsubNotify(protoId) {
    if (this.cacheNotifyCallback[protoId]) {
      delete this.cacheNotifyCallback[protoId];
    }
  }
  /**
   * 发送数据
   *
   * @async
   * @param {string} protoName 协议名称
   * @param {object} message 要发送的数据
   */
  send(protoName, message) {
    if (!this.isConnect) return this.logger.warn(`${this.name} 尚未连接,无法发送请求。`);
    const protoId = ProtoId[protoName];
    if (!protoId) return this.logger.warn(`找不到对应的协议Id:${protoName}`);
    // 请求序列号,自增
    if (this.requestId > 1000000) this.requestId = 1000;
    const { requestId } = this;
    this.requestId += 1;

    const request = this.root[protoName].Request;
    const response = this.root[protoName].Response;

    // 处理请求数据
    const reqBuffer = request.encode(request.create({
      c2s: message,
    })).finish();
    const sha1 = hexSha1(reqBuffer);
    const sha1Buffer = new Uint8Array(20).map((item, index) => Number(`0x${sha1.substr(index * 2, 2)}`));
    this.logger.debug(`request:${protoName}(${protoId}),reqId:${requestId}`);
    // 处理包头
    const buffer = Buffer.concat([
      Buffer.from('FT'), // 包头起始标志,固定为“FT”
      Buffer.from(new Uint32Array([protoId]).buffer), // 协议ID
      Buffer.from(new Uint8Array([0]).buffer), // 协议格式类型,0为Protobuf格式,1为Json格式
      Buffer.from(new Uint8Array([0]).buffer), // 协议版本,用于迭代兼容, 目前填0
      Buffer.from(new Uint32Array([requestId]).buffer), // 包序列号,用于对应请求包和回包, 要求递增
      Buffer.from(new Uint32Array([reqBuffer.length]).buffer), // 包体长度
      Buffer.from(sha1Buffer.buffer), // 包体原始数据(解密后)的SHA1哈希值
      Buffer.from(new Uint8Array([0, 0, 0, 0, 0, 0, 0, 0]).buffer), // 保留8字节扩展
      reqBuffer,
    ]);
    // 发送请求,处理回调
    this.socket.write(buffer);
    return new Promise((resolve, reject) => {
      this.cacheResponseCallback[requestId] = (responseBuffer) => {
        const result = response.decode(responseBuffer).toJSON();
        if (result.retType === 0) return resolve(result.s2c);
        const errMsg = `服务器返回结果失败,request:${protoName}(${protoId}),retType:${result.retType},reqId:${requestId},errMsg:${result.retMsg}`;
        this.logger.error(errMsg);
        return reject(new Error(errMsg));
      };
    });
  }
  /**
   * 解析包体数据
   */
  parseData() {
    const headerLen = 44; // 包头长度
    let bodyBuffer = null; // 包体buffer
    let bodyLen = 0; // 包体长度
    let reqId = null; // 请求序列号
    let protoId = null; // 请求协议Id
    let bodySha1 = null; // 包体sha1
    // 先处理包头
    if (!this.header && this.recvBuffer.length >= headerLen) {
      let recvSha1 = new Array(21).join('0').split('').map((item, index) => {
        let str = this.recvBuffer.readUInt8(16 + index).toString(16);
        if (str.length === 1) str = `0${str}`;
        return str;
      });
      recvSha1 = recvSha1.join('');
      this.header = {
        // 包头起始标志,固定为“FT”
        szHeaderFlag: String.fromCharCode(this.recvBuffer.readUInt8(0)) + String.fromCharCode(this.recvBuffer.readUInt8(1)),
        nProtoID: this.recvBuffer.readUInt32LE(2), // 协议ID
        nProtoFmtType: this.recvBuffer.readUInt8(6), // 协议格式类型,0为Protobuf格式,1为Json格式
        nProtoVer: this.recvBuffer.readUInt8(7), // 协议版本,用于迭代兼容
        nSerialNo: this.recvBuffer.readUInt32LE(8), // 包序列号
        nBodyLen: this.recvBuffer.readUInt32LE(12), // 包体长度
        arrBodySHA1: recvSha1, // 包体原数据(解密后)的SHA1哈希值
        arrReserved: this.recvBuffer.slice(36, 44), // 保留8字节扩展
      };
      if (this.header.szHeaderFlag !== 'FT') throw new Error('接收的包头数据格式错误');
      this.logger.debug(`response:${ProtoName[this.header.nProtoID]}(${this.header.nProtoID}),reqId:${this.header.nSerialNo},bodyLen:${bodyLen}`);
    }

    // 已经接收指定包体长度的全部数据,切割包体buffer
    if (this.header && this.recvBuffer.length >= this.header.nBodyLen + headerLen) {
      reqId = this.header.nSerialNo;
      protoId = this.header.nProtoID;
      bodyLen = this.header.nBodyLen;
      bodySha1 = this.header.arrBodySHA1;
      this.header = null;

      bodyBuffer = this.recvBuffer.slice(44, bodyLen + headerLen);
      this.recvBuffer = this.recvBuffer.slice(bodyLen + headerLen);

      const sha1 = hexSha1(bodyBuffer);
      if (sha1 !== bodySha1) {
        throw new Error(`接收的包体sha1加密错误:${bodySha1},本地sha1:${sha1}`);
      }
      // 交给回调处理包体数据
      if (this.cacheResponseCallback[reqId]) {
        this.cacheResponseCallback[reqId](bodyBuffer);
        delete this.cacheResponseCallback[reqId];
      }
      // 通知模块
      if (this.cacheNotifyCallback[protoId]) {
        try {
          // 加载proto协议文件
          const protoName = ProtoName[protoId];
          const response = this.root[protoName].Response;
          const result = response.decode(bodyBuffer).toJSON();
          this.cacheNotifyCallback[protoId](result.s2c);
        } catch (e) {
          const errMsg = `通知回调执行错误,response:${ProtoName[protoId]}(${protoId}),reqId:${reqId},bodyLen:${bodyLen},堆栈:${e.stack}`;
          this.logger.error(errMsg);
          throw new Error(errMsg);
        }
      }
      if (this.recvBuffer.length > headerLen) this.parseData();
    }
  }
}
module.exports = Socket;