const net = require('net');
const protobufjs = require('protobufjs');
const hexSha1 = require('hex-sha1');
const ProtoId = require('./protoid');
const Pb = require('./pb.json');
const ProtoName = {};
Object.keys(ProtoId).forEach((key) => {
ProtoName[ProtoId[key]] = key;
});
let id = 1;
/**
* Socket模块
*/
class Socket {
/**
* Creates an instance of Socket.
* @param {string} ip OpenD服务Ip
* @param {number} port OpenD服务端口
* @param {object} logger 日志对象
*/
constructor(ip, port, logger) {
/**
* OpenD服务IP
* @type {string}
*/
this.ip = ip;
/**
* OpenD服务端口
* @type {number}
*/
this.port = port;
/**
* 日志对象
* @type {object}
*/
this.logger = logger;
id += 1;
/**
* socket id,自增,用于区分多个socket。
* @type {number}
*/
this.id = id;
/**
* socket名称,用于区分多个socket。
* @type {string}
*/
this.name = `Socket(${this.id})`;
/**
* socket是否已经连接
* @type {boolean}
*/
this.isConnect = false;
/**
* 请求序列号,自增
* @type {number}
*/
this.requestId = 1000; // 请求序列号,自增
this.isHandStop = false;
this.root = protobufjs.Root.fromJSON(Pb);
this.cacheResponseCallback = {}; // 缓存的回调函数
this.cacheNotifyCallback = {}; // 缓存的通知回调函数
this.header = null; // 缓存接收的数据包头
this.recvBuffer = Buffer.allocUnsafe(0); // 缓存接收的数据
this.socket = new net.Socket();
this.socket.setKeepAlive(true);
this.socket.on('error', (data) => {
this.logger.error(`${this.name} on error: ${data}`);
this.socket.destroy();
this.isConnect = false;
});
this.socket.on('timeout', (e) => {
this.logger.error(`${this.name} on timeout.`, e);
this.socket.destroy();
this.isConnect = false;
});
// 为客户端添加“close”事件处理函数
this.socket.on('close', () => {
if (this.isHandStop) return;
const errMsg = `${this.name} on closed and retry connect on 5 seconds.`;
this.logger.error(errMsg);
this.isConnect = false;
// 5s后重连
if (this.timerRecontent) return;
this.timerRecontent = setTimeout(() => {
this.init();
}, 5000);
});
// 接收数据
this.socket.on('data', (data) => {
this.recvBuffer = Buffer.concat([this.recvBuffer, data]);
this.parseData();
});
}
async init() {
if (this.isConnect) return;
await this.connect();
}
/**
* 立即建立连接
*/
async connect() {
this.isHandStop = false;
return new Promise((resolve) => {
if (this.timerRecontent) {
clearTimeout(this.timerRecontent);
this.timerRecontent = null;
}
this.socket.connect({
port: this.port,
host: this.ip,
timeout: 1000 * 30,
}, async () => {
this.logger.debug(`${this.name} connect success:${this.ip}:${this.port}`);
this.isConnect = true;
if (typeof this.connectCallback === 'function') this.connectCallback();
resolve();
});
});
}
async close() {
this.socket.end();
this.socket.destroy();
this.isHandStop = true;
this.logger.info('手动关闭 socket 。');
}
/**
* 设置连接成功的回调函数
*
* @param {function} cb
* @memberof Socket
*/
onConnect(cb) {
this.connectCallback = cb;
}
/**
* 注册协议的通知
*
* @param {number} protoId 协议id
* @param {function} callback 回调函数
*/
subNotify(protoId, callback) {
this.cacheNotifyCallback[protoId] = callback;
}
/**
* 删除一个通知
* @param {number} protoId 协议id
*/
unsubNotify(protoId) {
if (this.cacheNotifyCallback[protoId]) {
delete this.cacheNotifyCallback[protoId];
}
}
/**
* 发送数据
*
* @async
* @param {string} protoName 协议名称
* @param {object} message 要发送的数据
*/
send(protoName, message) {
if (!this.isConnect) return this.logger.warn(`${this.name} 尚未连接,无法发送请求。`);
const protoId = ProtoId[protoName];
if (!protoId) return this.logger.warn(`找不到对应的协议Id:${protoName}`);
// 请求序列号,自增
if (this.requestId > 1000000) this.requestId = 1000;
const { requestId } = this;
this.requestId += 1;
const request = this.root[protoName].Request;
const response = this.root[protoName].Response;
// 处理请求数据
const reqBuffer = request.encode(request.create({
c2s: message,
})).finish();
const sha1 = hexSha1(reqBuffer);
const sha1Buffer = new Uint8Array(20).map((item, index) => Number(`0x${sha1.substr(index * 2, 2)}`));
this.logger.debug(`request:${protoName}(${protoId}),reqId:${requestId}`);
// 处理包头
const buffer = Buffer.concat([
Buffer.from('FT'), // 包头起始标志,固定为“FT”
Buffer.from(new Uint32Array([protoId]).buffer), // 协议ID
Buffer.from(new Uint8Array([0]).buffer), // 协议格式类型,0为Protobuf格式,1为Json格式
Buffer.from(new Uint8Array([0]).buffer), // 协议版本,用于迭代兼容, 目前填0
Buffer.from(new Uint32Array([requestId]).buffer), // 包序列号,用于对应请求包和回包, 要求递增
Buffer.from(new Uint32Array([reqBuffer.length]).buffer), // 包体长度
Buffer.from(sha1Buffer.buffer), // 包体原始数据(解密后)的SHA1哈希值
Buffer.from(new Uint8Array([0, 0, 0, 0, 0, 0, 0, 0]).buffer), // 保留8字节扩展
reqBuffer,
]);
// 发送请求,处理回调
this.socket.write(buffer);
return new Promise((resolve, reject) => {
this.cacheResponseCallback[requestId] = (responseBuffer) => {
const result = response.decode(responseBuffer).toJSON();
if (result.retType === 0) return resolve(result.s2c);
const errMsg = `服务器返回结果失败,request:${protoName}(${protoId}),retType:${result.retType},reqId:${requestId},errMsg:${result.retMsg}`;
this.logger.error(errMsg);
return reject(new Error(errMsg));
};
});
}
/**
* 解析包体数据
*/
parseData() {
const headerLen = 44; // 包头长度
let bodyBuffer = null; // 包体buffer
let bodyLen = 0; // 包体长度
let reqId = null; // 请求序列号
let protoId = null; // 请求协议Id
let bodySha1 = null; // 包体sha1
// 先处理包头
if (!this.header && this.recvBuffer.length >= headerLen) {
let recvSha1 = new Array(21).join('0').split('').map((item, index) => {
let str = this.recvBuffer.readUInt8(16 + index).toString(16);
if (str.length === 1) str = `0${str}`;
return str;
});
recvSha1 = recvSha1.join('');
this.header = {
// 包头起始标志,固定为“FT”
szHeaderFlag: String.fromCharCode(this.recvBuffer.readUInt8(0)) + String.fromCharCode(this.recvBuffer.readUInt8(1)),
nProtoID: this.recvBuffer.readUInt32LE(2), // 协议ID
nProtoFmtType: this.recvBuffer.readUInt8(6), // 协议格式类型,0为Protobuf格式,1为Json格式
nProtoVer: this.recvBuffer.readUInt8(7), // 协议版本,用于迭代兼容
nSerialNo: this.recvBuffer.readUInt32LE(8), // 包序列号
nBodyLen: this.recvBuffer.readUInt32LE(12), // 包体长度
arrBodySHA1: recvSha1, // 包体原数据(解密后)的SHA1哈希值
arrReserved: this.recvBuffer.slice(36, 44), // 保留8字节扩展
};
if (this.header.szHeaderFlag !== 'FT') throw new Error('接收的包头数据格式错误');
this.logger.debug(`response:${ProtoName[this.header.nProtoID]}(${this.header.nProtoID}),reqId:${this.header.nSerialNo},bodyLen:${bodyLen}`);
}
// 已经接收指定包体长度的全部数据,切割包体buffer
if (this.header && this.recvBuffer.length >= this.header.nBodyLen + headerLen) {
reqId = this.header.nSerialNo;
protoId = this.header.nProtoID;
bodyLen = this.header.nBodyLen;
bodySha1 = this.header.arrBodySHA1;
this.header = null;
bodyBuffer = this.recvBuffer.slice(44, bodyLen + headerLen);
this.recvBuffer = this.recvBuffer.slice(bodyLen + headerLen);
const sha1 = hexSha1(bodyBuffer);
if (sha1 !== bodySha1) {
throw new Error(`接收的包体sha1加密错误:${bodySha1},本地sha1:${sha1}`);
}
// 交给回调处理包体数据
if (this.cacheResponseCallback[reqId]) {
this.cacheResponseCallback[reqId](bodyBuffer);
delete this.cacheResponseCallback[reqId];
}
// 通知模块
if (this.cacheNotifyCallback[protoId]) {
try {
// 加载proto协议文件
const protoName = ProtoName[protoId];
const response = this.root[protoName].Response;
const result = response.decode(bodyBuffer).toJSON();
this.cacheNotifyCallback[protoId](result.s2c);
} catch (e) {
const errMsg = `通知回调执行错误,response:${ProtoName[protoId]}(${protoId}),reqId:${reqId},bodyLen:${bodyLen},堆栈:${e.stack}`;
this.logger.error(errMsg);
throw new Error(errMsg);
}
}
if (this.recvBuffer.length > headerLen) this.parseData();
}
}
}
module.exports = Socket;