From d3e70ac6a80e3ae20fbbe8bf72def4c63b49084f Mon Sep 17 00:00:00 2001 From: Dmitri Tikhonov Date: Wed, 28 Jun 2023 14:31:54 +0000 Subject: [PATCH 1/2] Pseudo code: how streaming would work --- packages/client/lib/client/RESP2/decoder.ts | 31 ++++++++++++++++++++ packages/client/lib/client/commands-queue.ts | 5 ++++ 2 files changed, 36 insertions(+) diff --git a/packages/client/lib/client/RESP2/decoder.ts b/packages/client/lib/client/RESP2/decoder.ts index 525f118bf30..6c79d700f39 100644 --- a/packages/client/lib/client/RESP2/decoder.ts +++ b/packages/client/lib/client/RESP2/decoder.ts @@ -59,7 +59,38 @@ export default class RESP2Decoder { this.currentStringComposer = this.stringComposer; } + wantStream(want: boolean) { + this.wantStream = want; + } + + private writeStream(chunk: Buffer): void { + while (this.cursor < chunk.length) { + if (!this.type) { + this.currentStringComposer = this.streamComposer; /* TODO to be implemented */ + + this.type = chunk[this.cursor]; + if (++this.cursor >= chunk.length) break; + } + + if (!this.currentStream) { + this.currentStream = this.streamComposer.getStream(); + this.options.onReply(this.currentStream); + } + + const done = this.parseType(chunk, this.type); + if (done === undefined) break; + + this.type = undefined; + this.currentStream = undefined; + } + + this.cursor -= chunk.length; + } + write(chunk: Buffer): void { + if (this.wantStream) + this.writeStream(chunk); + while (this.cursor < chunk.length) { if (!this.type) { this.currentStringComposer = this.options.returnStringsAsBuffers() ? diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 7fffed86580..aec67a35679 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -236,7 +236,12 @@ export default class RedisCommandsQueue { return encoded; } + private currentCommandWantsStream() { + return false; /* TODO */ + } + onReplyChunk(chunk: Buffer): void { + this.#decoder.wantStream( currentCommandWantsStream() ); this.#decoder.write(chunk); } From 36938e5d2fa19c5eec6922c606da7fbae41e2a02 Mon Sep 17 00:00:00 2001 From: Dmitri Tikhonov Date: Wed, 28 Jun 2023 14:43:46 +0000 Subject: [PATCH 2/2] Disambiguate --- packages/client/lib/client/RESP2/decoder.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/client/lib/client/RESP2/decoder.ts b/packages/client/lib/client/RESP2/decoder.ts index 6c79d700f39..9b11aef93de 100644 --- a/packages/client/lib/client/RESP2/decoder.ts +++ b/packages/client/lib/client/RESP2/decoder.ts @@ -60,7 +60,7 @@ export default class RESP2Decoder { } wantStream(want: boolean) { - this.wantStream = want; + this.#wantStream = want; } private writeStream(chunk: Buffer): void { @@ -88,7 +88,7 @@ export default class RESP2Decoder { } write(chunk: Buffer): void { - if (this.wantStream) + if (this.#wantStream) this.writeStream(chunk); while (this.cursor < chunk.length) {