Properly re-assemble all data in http requests before handling it (!162)

Remove unneeded HttpBufferHandler

-----------

The old code processed each chunk of data as an entire request, which is not correct. It was observed split data after ~14600 bytes (on a 1 gig lan connection). I think it was worse on remote connections.

This was the cause of the "unknown compression method" and invalid json parse errors when saving the profile.

Co-authored-by: Decoy <redacted@example.com>
Reviewed-on: https://dev.sp-tarkov.com/SPT-AKI/Server/pulls/162
Reviewed-by: Terkoiz <terkoiz@noreply.dev.sp-tarkov.com>
Co-authored-by: ree <ree@noreply.dev.sp-tarkov.com>
Co-committed-by: ree <ree@noreply.dev.sp-tarkov.com>
This commit is contained in:
ree 2023-10-30 09:23:30 +00:00 committed by chomp
parent fe703b34ec
commit 9fa0bcc705
2 changed files with 35 additions and 77 deletions

View File

@ -5,7 +5,6 @@ import { inject, injectAll, injectable } from "tsyringe";
import { Serializer } from "@spt-aki/di/Serializer"; import { Serializer } from "@spt-aki/di/Serializer";
import { ILogger } from "@spt-aki/models/spt/utils/ILogger"; import { ILogger } from "@spt-aki/models/spt/utils/ILogger";
import { HttpRouter } from "@spt-aki/routers/HttpRouter"; import { HttpRouter } from "@spt-aki/routers/HttpRouter";
import { HttpBufferHandler } from "@spt-aki/servers/http/HttpBufferHandler";
import { IHttpListener } from "@spt-aki/servers/http/IHttpListener"; import { IHttpListener } from "@spt-aki/servers/http/IHttpListener";
import { LocalisationService } from "@spt-aki/services/LocalisationService"; import { LocalisationService } from "@spt-aki/services/LocalisationService";
import { HttpResponseUtil } from "@spt-aki/utils/HttpResponseUtil"; import { HttpResponseUtil } from "@spt-aki/utils/HttpResponseUtil";
@ -22,20 +21,18 @@ export class AkiHttpListener implements IHttpListener
@inject("RequestsLogger") protected requestsLogger: ILogger, @inject("RequestsLogger") protected requestsLogger: ILogger,
@inject("JsonUtil") protected jsonUtil: JsonUtil, @inject("JsonUtil") protected jsonUtil: JsonUtil,
@inject("HttpResponseUtil") protected httpResponse: HttpResponseUtil, @inject("HttpResponseUtil") protected httpResponse: HttpResponseUtil,
@inject("LocalisationService") protected localisationService: LocalisationService, @inject("LocalisationService") protected localisationService: LocalisationService
@inject("HttpBufferHandler") protected httpBufferHandler: HttpBufferHandler
) )
{ {
} }
public canHandle(_: string, req: IncomingMessage): boolean public canHandle(_: string, req: IncomingMessage): boolean
{ {
return req.method === "GET" || req.method === "PUT" || req.method === "POST"; return req.method === "GET" || req.method === "PUT" || req.method === "POST";
} }
public handle(sessionId: string, req: IncomingMessage, resp: ServerResponse): void public handle(sessionId: string, req: IncomingMessage, resp: ServerResponse): void
{ {
// TODO: cleanup into interface IVerbHandler
switch (req.method) switch (req.method)
{ {
case "GET": case "GET":
@ -44,51 +41,48 @@ export class AkiHttpListener implements IHttpListener
this.sendResponse(sessionId, req, resp, null, response); this.sendResponse(sessionId, req, resp, null, response);
break; break;
} }
// these are handled almost identically.
case "POST": case "POST":
{
req.on("data", (data: any) =>
{
const value = (req.headers["debug"] === "1") ? data.toString() : zlib.inflateSync(data);
const response = this.getResponse(sessionId, req, value);
this.sendResponse(sessionId, req, resp, value, response);
});
break;
}
case "PUT": case "PUT":
{ {
req.on("data", (data) => // Data can come in chunks. Notably, if someone saves their profile (which can be
{ // kinda big), on a slow connection. We need to re-assemble the entire http payload
// receive data // before processing it.
if ("expect" in req.headers)
{ const requestLength = parseInt(req.headers["content-length"]);
const requestLength = parseInt(req.headers["content-length"]); const buffer = Buffer.alloc(requestLength);
let written = 0;
if (!this.httpBufferHandler.putInBuffer(req.headers.sessionid, data, requestLength))
{ req.on("data", (data: any) => {
resp.writeContinue(); data.copy(buffer, written, 0);
} written += data.length;
}
}); });
req.on("end", async () => req.on("end", () =>
{ {
const data = this.httpBufferHandler.getFromBuffer(sessionId); // Contrary to reasonable expectations, the content-encoding is _not_ actually used to
this.httpBufferHandler.resetBuffer(sessionId); // determine if the payload is compressed. All PUT requests are, and POST requests without
// debug = 1 are as well. This should be fixed.
let value = zlib.inflateSync(data); // let compressed = req.headers["content-encoding"] === "deflate";
if (!value) let compressed = req.method === "PUT" || req.headers["debug"] !== "1";
const value = compressed ? zlib.inflateSync(buffer) : buffer;
if (req.headers["debug"] === "1")
{ {
value = data; console.log(value.toString());
} }
const response = this.getResponse(sessionId, req, value); const response = this.getResponse(sessionId, req, value);
this.sendResponse(sessionId, req, resp, value, response); this.sendResponse(sessionId, req, resp, value, response);
}); });
break; break;
} }
default: default:
{ {
this.logger.warning(this.localisationService.getText("unknown_request") + ": " + req.method);
this.logger.warning(this.localisationService.getText("unknown_request"));
break; break;
} }
} }
@ -100,7 +94,7 @@ export class AkiHttpListener implements IHttpListener
let handled = false; let handled = false;
// Check if this is a debug request, if so just send the raw response without transformation. // Check if this is a debug request, if so just send the raw response without transformation.
if (req.headers["debug"] === "1") if (req.headers["debug"] === "1")
{ {
this.sendJson(resp, output, sessionID); this.sendJson(resp, output, sessionID);
} }
@ -127,7 +121,7 @@ export class AkiHttpListener implements IHttpListener
this.requestsLogger.info(`RESPONSE=${this.jsonUtil.serialize(log)}`); this.requestsLogger.info(`RESPONSE=${this.jsonUtil.serialize(log)}`);
} }
} }
public getResponse(sessionID: string, req: IncomingMessage, body: Buffer): string public getResponse(sessionID: string, req: IncomingMessage, body: Buffer): string
{ {
const info = this.getBodyInfo(body, req.url); const info = this.getBodyInfo(body, req.url);
@ -141,7 +135,7 @@ export class AkiHttpListener implements IHttpListener
const log = new Request(req.method, new RequestData(req.url, req.headers, data)); const log = new Request(req.method, new RequestData(req.url, req.headers, data));
this.requestsLogger.info(`REQUEST=${this.jsonUtil.serialize(log)}`); this.requestsLogger.info(`REQUEST=${this.jsonUtil.serialize(log)}`);
} }
let output = this.httpRouter.getResponse(req, info, sessionID); let output = this.httpRouter.getResponse(req, info, sessionID);
/* route doesn't exist or response is not properly set up */ /* route doesn't exist or response is not properly set up */
if (!output) if (!output)
@ -192,10 +186,10 @@ class Request
public type: string, public type: string,
public req: RequestData public req: RequestData
) )
{} {}
} }
class Response class Response
{ {
constructor( constructor(
public type: string, public type: string,

View File

@ -1,36 +0,0 @@
import { injectable } from "tsyringe";
@injectable()
export class HttpBufferHandler
{
protected buffers = {};
public resetBuffer(sessionID: string): void
{
this.buffers[sessionID] = undefined;
}
public putInBuffer(sessionID: any, data: any, bufLength: number): boolean
{
if (this.buffers[sessionID] === undefined || this.buffers[sessionID].allocated !== bufLength)
{
this.buffers[sessionID] = {
written: 0,
allocated: bufLength,
buffer: Buffer.alloc(bufLength)
};
}
const buf = this.buffers[sessionID];
data.copy(buf.buffer, buf.written, 0);
buf.written += data.length;
return buf.written === buf.allocated;
}
public getFromBuffer(sessionID: string): any
{
return this.buffers[sessionID].buffer;
}
}