Skip to content

Commit 3a3dacf

Browse files
committed
store blobs rather than texts
1 parent 2d435bf commit 3a3dacf

File tree

3 files changed

+33
-57
lines changed

3 files changed

+33
-57
lines changed

packages/open-next/src/adapters/composable-cache.ts

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import type { ComposableCacheEntry, ComposableCacheHandler } from "types/cache";
22
import { writeTags } from "utils/cache";
3-
import { fromReadableStream, toReadableStream } from "utils/stream";
43
import { debug } from "./logger";
54

65
const pendingWritePromiseMap = new Map<string, Promise<ComposableCacheEntry>>();
@@ -50,7 +49,7 @@ export default {
5049

5150
return {
5251
...result.value,
53-
value: toReadableStream(result.value.value),
52+
value: result.value.value,
5453
};
5554
} catch (e) {
5655
debug("Cannot read composable cache entry");
@@ -59,39 +58,19 @@ export default {
5958
},
6059

6160
async set(cacheKey: string, pendingEntry: Promise<ComposableCacheEntry>) {
62-
const teedPromise = pendingEntry.then((entry) => {
63-
// Optimization: We avoid consuming and stringifying the stream here,
64-
// because it creates double copies just to be discarded when this function
65-
// ends. This avoids unnecessary memory usage, and reduces GC pressure.
66-
const [stream1, stream2] = entry.value.tee();
67-
return [
68-
{ ...entry, value: stream1 },
69-
{ ...entry, value: stream2 },
70-
] as const;
71-
});
72-
73-
pendingWritePromiseMap.set(
74-
cacheKey,
75-
teedPromise.then(([entry]) => entry),
76-
);
61+
// Store the entry in the pending map so concurrent get() calls can access it
62+
pendingWritePromiseMap.set(cacheKey, pendingEntry);
7763

78-
const [, entryForStorage] = await teedPromise.finally(() => {
64+
const entry = await pendingEntry.finally(() => {
7965
pendingWritePromiseMap.delete(cacheKey);
8066
});
8167

82-
await globalThis.incrementalCache.set(
83-
cacheKey,
84-
{
85-
...entryForStorage,
86-
value: await fromReadableStream(entryForStorage.value),
87-
},
88-
"composable",
89-
);
68+
await globalThis.incrementalCache.set(cacheKey, entry, "composable");
9069

9170
if (globalThis.tagCache.mode === "original") {
9271
const storedTags = await globalThis.tagCache.getByPath(cacheKey);
9372
const tagsToWrite = [];
94-
for (const tag of entryForStorage.tags) {
73+
for (const tag of entry.tags) {
9574
if (!storedTags.includes(tag)) {
9675
tagsToWrite.push({ tag, path: cacheKey });
9776
}

packages/open-next/src/types/cache.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import type { ReadableStream } from "node:stream/web";
2-
31
interface CachedFetchValue {
42
kind: "FETCH";
53
data: {
@@ -143,7 +141,7 @@ export type IncrementalCacheContext =
143141
| SetIncrementalResponseCacheContext;
144142

145143
export interface ComposableCacheEntry {
146-
value: ReadableStream<Uint8Array>;
144+
value: Blob;
147145
tags: string[];
148146
stale: number;
149147
timestamp: number;
@@ -152,7 +150,7 @@ export interface ComposableCacheEntry {
152150
}
153151

154152
export type StoredComposableCacheEntry = Omit<ComposableCacheEntry, "value"> & {
155-
value: string;
153+
value: Blob;
156154
};
157155

158156
export interface ComposableCacheHandler {

packages/tests-unit/tests/adapters/composable-cache.test.ts

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
11
import ComposableCache from "@opennextjs/aws/adapters/composable-cache";
2-
import {
3-
fromReadableStream,
4-
toReadableStream,
5-
} from "@opennextjs/aws/utils/stream";
62
import { vi } from "vitest";
73

84
describe("Composable cache handler", () => {
@@ -19,7 +15,7 @@ describe("Composable cache handler", () => {
1915
timestamp: Date.now(),
2016
expire: Date.now() + 1000,
2117
revalidate: 3600,
22-
value: "test-value",
18+
value: new Blob(["test-value"]),
2319
},
2420
lastModified: Date.now(),
2521
}),
@@ -81,7 +77,7 @@ describe("Composable cache handler", () => {
8177
);
8278
expect(result).toBeDefined();
8379
expect(result?.tags).toEqual(["tag1", "tag2"]);
84-
expect(result?.value).toBeInstanceOf(ReadableStream);
80+
expect(result?.value).toBeInstanceOf(Blob);
8581
});
8682

8783
it("should return undefined when cache entry does not exist", async () => {
@@ -132,7 +128,7 @@ describe("Composable cache handler", () => {
132128
type: "route",
133129
body: "{}",
134130
tags: [],
135-
value: "test-value",
131+
value: new Blob(["test-value"]),
136132
},
137133
lastModified: Date.now(),
138134
});
@@ -185,7 +181,7 @@ describe("Composable cache handler", () => {
185181

186182
it("should return pending write promise if available", async () => {
187183
const pendingEntry = Promise.resolve({
188-
value: toReadableStream("pending-value"),
184+
value: new Blob(["pending-value"]),
189185
tags: ["tag1"],
190186
stale: 0,
191187
timestamp: Date.now(),
@@ -200,7 +196,7 @@ describe("Composable cache handler", () => {
200196
const result = await ComposableCache.get("pending-key");
201197

202198
expect(result).toBeDefined();
203-
expect(result?.value).toBeInstanceOf(ReadableStream);
199+
expect(result?.value).toBeInstanceOf(Blob);
204200

205201
// Wait for set to complete
206202
await setPromise;
@@ -214,8 +210,9 @@ describe("Composable cache handler", () => {
214210

215211
it("should set cache entry and handle tags in original mode", async () => {
216212
tagCache.mode = "original";
213+
const blob = new Blob(["test-value"]);
217214
const entry = {
218-
value: toReadableStream("test-value"),
215+
value: blob,
219216
tags: ["tag1", "tag2"],
220217
stale: 0,
221218
timestamp: Date.now(),
@@ -229,7 +226,7 @@ describe("Composable cache handler", () => {
229226
"test-key",
230227
expect.objectContaining({
231228
tags: ["tag1", "tag2"],
232-
value: "test-value",
229+
value: blob,
233230
}),
234231
"composable",
235232
);
@@ -241,7 +238,7 @@ describe("Composable cache handler", () => {
241238
tagCache.getByPath.mockResolvedValueOnce(["tag1"]);
242239

243240
const entry = {
244-
value: toReadableStream("test-value"),
241+
value: new Blob(["test-value"]),
245242
tags: ["tag1", "tag2", "tag3"],
246243
stale: 0,
247244
timestamp: Date.now(),
@@ -262,7 +259,7 @@ describe("Composable cache handler", () => {
262259
tagCache.getByPath.mockResolvedValueOnce(["tag1", "tag2"]);
263260

264261
const entry = {
265-
value: toReadableStream("test-value"),
262+
value: new Blob(["test-value"]),
266263
tags: ["tag1", "tag2"],
267264
stale: 0,
268265
timestamp: Date.now(),
@@ -279,7 +276,7 @@ describe("Composable cache handler", () => {
279276
tagCache.mode = "nextMode";
280277

281278
const entry = {
282-
value: toReadableStream("test-value"),
279+
value: new Blob(["test-value"]),
283280
tags: ["tag1", "tag2"],
284281
stale: 0,
285282
timestamp: Date.now(),
@@ -293,9 +290,10 @@ describe("Composable cache handler", () => {
293290
expect(tagCache.writeTags).not.toHaveBeenCalled();
294291
});
295292

296-
it("should convert ReadableStream to string", async () => {
293+
it("should store Blob directly", async () => {
294+
const blob = new Blob(["test-content"]);
297295
const entry = {
298-
value: toReadableStream("test-content"),
296+
value: blob,
299297
tags: ["tag1"],
300298
stale: 0,
301299
timestamp: Date.now(),
@@ -308,7 +306,7 @@ describe("Composable cache handler", () => {
308306
expect(incrementalCache.set).toHaveBeenCalledWith(
309307
"test-key",
310308
expect.objectContaining({
311-
value: "test-content",
309+
value: blob,
312310
}),
313311
"composable",
314312
);
@@ -437,8 +435,9 @@ describe("Composable cache handler", () => {
437435
describe("integration tests", () => {
438436
it("should handle complete cache lifecycle", async () => {
439437
// Set a cache entry
438+
const blob = new Blob(["integration-test"]);
440439
const entry = {
441-
value: toReadableStream("integration-test"),
440+
value: blob,
442441
tags: ["integration-tag"],
443442
stale: 0,
444443
timestamp: Date.now(),
@@ -452,7 +451,7 @@ describe("Composable cache handler", () => {
452451
expect(incrementalCache.set).toHaveBeenCalledWith(
453452
"integration-key",
454453
expect.objectContaining({
455-
value: "integration-test",
454+
value: blob,
456455
tags: ["integration-tag"],
457456
}),
458457
"composable",
@@ -462,7 +461,7 @@ describe("Composable cache handler", () => {
462461
incrementalCache.get.mockResolvedValueOnce({
463462
value: {
464463
...entry,
465-
value: "integration-test",
464+
value: blob,
466465
},
467466
lastModified: Date.now(),
468467
});
@@ -473,14 +472,14 @@ describe("Composable cache handler", () => {
473472
expect(result).toBeDefined();
474473
expect(result?.tags).toEqual(["integration-tag"]);
475474

476-
// Convert the stream back to verify content
477-
const content = await fromReadableStream(result!.value);
475+
// Convert the blob back to verify content
476+
const content = await result!.value.text();
478477
expect(content).toBe("integration-test");
479478
});
480479

481480
it("should handle concurrent get/set operations", async () => {
482481
const entry1 = {
483-
value: toReadableStream("concurrent-1"),
482+
value: new Blob(["concurrent-1"]),
484483
tags: ["tag1"],
485484
stale: 0,
486485
timestamp: Date.now(),
@@ -489,7 +488,7 @@ describe("Composable cache handler", () => {
489488
};
490489

491490
const entry2 = {
492-
value: toReadableStream("concurrent-2"),
491+
value: new Blob(["concurrent-2"]),
493492
tags: ["tag2"],
494493
stale: 0,
495494
timestamp: Date.now(),
@@ -513,10 +512,10 @@ describe("Composable cache handler", () => {
513512
expect(results[2]).toBeDefined();
514513
expect(results[3]).toBeDefined();
515514

516-
const content1 = await fromReadableStream(results[2]!.value);
515+
const content1 = await results[2]!.value.text();
517516
expect(content1).toBe("concurrent-1");
518517

519-
const content2 = await fromReadableStream(results[3]!.value);
518+
const content2 = await results[3]!.value.text();
520519
expect(content2).toBe("concurrent-2");
521520
});
522521
});

0 commit comments

Comments
 (0)