diff --git a/packages/client/lib/client/RESP2/decoder.ts b/packages/client/lib/client/RESP2/decoder.ts index 525f118bf30..9b11aef93de 100644 --- a/packages/client/lib/client/RESP2/decoder.ts +++ b/packages/client/lib/client/RESP2/decoder.ts @@ -59,7 +59,38 @@ export default class RESP2Decoder { this.currentStringComposer = this.stringComposer; } + wantStream(want: boolean) { + this.#wantStream = want; + } + + private writeStream(chunk: Buffer): void { + while (this.cursor < chunk.length) { + if (!this.type) { + this.currentStringComposer = this.streamComposer; /* TODO to be implemented */ + + this.type = chunk[this.cursor]; + if (++this.cursor >= chunk.length) break; + } + + if (!this.currentStream) { + this.currentStream = this.streamComposer.getStream(); + this.options.onReply(this.currentStream); + } + + const done = this.parseType(chunk, this.type); + if (done === undefined) break; + + this.type = undefined; + this.currentStream = undefined; + } + + this.cursor -= chunk.length; + } + write(chunk: Buffer): void { + if (this.#wantStream) + this.writeStream(chunk); + while (this.cursor < chunk.length) { if (!this.type) { this.currentStringComposer = this.options.returnStringsAsBuffers() ? diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 7fffed86580..aec67a35679 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -236,7 +236,12 @@ export default class RedisCommandsQueue { return encoded; } + private currentCommandWantsStream() { + return false; /* TODO */ + } + onReplyChunk(chunk: Buffer): void { + this.#decoder.wantStream( currentCommandWantsStream() ); this.#decoder.write(chunk); }