Skip to content

Commit 18a917f

Browse files
feat: allow to use the adapter within a Node.js cluster
Related: socketio/socket.io-cluster-adapter#1
1 parent a5b266f commit 18a917f

File tree

9 files changed

+685
-323
lines changed

9 files changed

+685
-323
lines changed

README.md

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,15 @@ The `@socket.io/postgres-adapter` package allows broadcasting packets between mu
99

1010
**Table of contents**
1111

12-
- [Supported features](#supported-features)
13-
- [Installation](#installation)
14-
- [Usage](#usage)
15-
- [License](#license)
12+
<!-- TOC -->
13+
* [Supported features](#supported-features)
14+
* [Installation](#installation)
15+
* [Usage](#usage)
16+
* [Standalone](#standalone)
17+
* [With Node.js cluster](#with-nodejs-cluster)
18+
* [Options](#options)
19+
* [License](#license)
20+
<!-- TOC -->
1621

1722
## Supported features
1823

@@ -31,6 +36,8 @@ npm install @socket.io/postgres-adapter
3136

3237
## Usage
3338

39+
### Standalone
40+
3441
```js
3542
import { Server } from "socket.io";
3643
import { createAdapter } from "@socket.io/postgres-adapter";
@@ -62,6 +69,60 @@ io.adapter(createAdapter(pool));
6269
io.listen(3000);
6370
```
6471

72+
### With Node.js cluster
73+
74+
```js
75+
import cluster from "node:cluster";
76+
import { createServer } from "node:http";
77+
import { availableParallelism } from "node:os";
78+
import { Server } from "socket.io";
79+
import { setupPrimary } from "@socket.io/postgres-adapter"
80+
import { createAdapter } from "@socket.io/cluster-adapter";
81+
import pg from "pg";
82+
83+
if (cluster.isPrimary) {
84+
const pool = new pg.Pool({
85+
user: "postgres",
86+
password: "changeit",
87+
});
88+
89+
await pool.query(`
90+
CREATE TABLE IF NOT EXISTS socket_io_attachments (
91+
id bigserial UNIQUE,
92+
created_at timestamptz DEFAULT NOW(),
93+
payload bytea
94+
);
95+
`);
96+
97+
setupPrimary(pool);
98+
99+
for (let i = 0; i < availableParallelism(); i++) {
100+
cluster.fork();
101+
}
102+
} else {
103+
const io = new Server({
104+
adapter: createAdapter(),
105+
});
106+
107+
io.on("connection", (socket) => {
108+
/* ... */
109+
});
110+
111+
io.listen(3000);
112+
}
113+
```
114+
115+
## Options
116+
117+
| Name | Description | Default value |
118+
|---------------------|------------------------------------------------------------------------------------------------------------|-------------------------|
119+
| `channelPrefix` | The prefix of the notification channel. | `socket.io` |
120+
| `tableName` | The name of the table for payloads over the 8000 bytes limit or containing binary data. | `socket_io_attachments` |
121+
| `payloadThreshold` | The threshold for the payload size in bytes (see https://www.postgresql.org/docs/current/sql-notify.html). | `8_000` |
122+
| `cleanupInterval` | Number of ms between two cleanup queries. | `30_000` |
123+
| `heartbeatInterval` | The number of ms between two heartbeats. | `5_000` |
124+
| `heartbeatTimeout` | The number of ms without heartbeat before we consider a node down. | `10_000` |
125+
65126
## License
66127

67128
[MIT](LICENSE)

lib/adapter.ts

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
import { type Pool } from "pg";
2+
import { ClusterAdapterWithHeartbeat } from "socket.io-adapter";
3+
import type {
4+
ClusterAdapterOptions,
5+
ClusterMessage,
6+
ClusterResponse,
7+
Offset,
8+
ServerId,
9+
} from "socket.io-adapter";
10+
import debugModule from "debug";
11+
import { PubSubClient } from "./util";
12+
13+
const debug = debugModule("socket.io-postgres-adapter");
14+
15+
export interface PostgresAdapterOptions {
16+
/**
17+
* The prefix of the notification channel
18+
* @default "socket.io"
19+
*/
20+
channelPrefix?: string;
21+
/**
22+
* The name of the table for payloads over the 8000 bytes limit or containing binary data
23+
* @default "socket_io_attachments"
24+
*/
25+
tableName?: string;
26+
/**
27+
* The threshold for the payload size in bytes (see https://www.postgresql.org/docs/current/sql-notify.html)
28+
* @default 8000
29+
*/
30+
payloadThreshold?: number;
31+
/**
32+
* Number of ms between two cleanup queries
33+
* @default 30000
34+
*/
35+
cleanupInterval?: number;
36+
/**
37+
* Handler for errors. If undefined, the errors will be simply logged.
38+
*
39+
* @default undefined
40+
*/
41+
errorHandler?: (err: Error) => void;
42+
}
43+
44+
/**
45+
* Returns a function that will create a PostgresAdapter instance.
46+
*
47+
* @param pool - a pg.Pool instance
48+
* @param opts - additional options
49+
*
50+
* @public
51+
*/
52+
export function createAdapter(
53+
pool: Pool,
54+
opts: PostgresAdapterOptions & ClusterAdapterOptions = {}
55+
) {
56+
const options = Object.assign(
57+
{
58+
channelPrefix: "socket.io",
59+
tableName: "socket_io_attachments",
60+
payloadThreshold: 8_000,
61+
cleanupInterval: 30_000,
62+
errorHandler: (err: Error) => debug(err),
63+
},
64+
opts
65+
);
66+
67+
const namespaces = new Map<string, PostgresAdapter>();
68+
const client = new PubSubClient(
69+
pool,
70+
options,
71+
(msg) => {
72+
// @ts-expect-error uid is protected
73+
return namespaces.get(msg.nsp)?.uid === msg.uid;
74+
},
75+
(msg) => {
76+
namespaces.get(msg.nsp)?.onMessage(msg);
77+
}
78+
);
79+
80+
return function (nsp: any) {
81+
let adapter = new PostgresAdapter(nsp, opts, client);
82+
83+
namespaces.set(nsp.name, adapter);
84+
client.addNamespace(nsp.name);
85+
86+
const defaultClose = adapter.close;
87+
88+
adapter.close = () => {
89+
namespaces.delete(nsp.name);
90+
91+
if (namespaces.size === 0) {
92+
client.close();
93+
}
94+
95+
defaultClose.call(adapter);
96+
};
97+
98+
return adapter;
99+
};
100+
}
101+
102+
export class PostgresAdapter extends ClusterAdapterWithHeartbeat {
103+
/**
104+
* Adapter constructor.
105+
*
106+
* @param nsp - the namespace
107+
* @param opts - additional options
108+
* @param client
109+
*
110+
* @public
111+
*/
112+
constructor(
113+
nsp: any,
114+
opts: ClusterAdapterOptions,
115+
private readonly client: PubSubClient
116+
) {
117+
super(nsp, opts);
118+
}
119+
120+
protected override doPublish(message: ClusterMessage): Promise<Offset> {
121+
return this.client.publish(message).then(() => {
122+
// connection state recovery is not currently supported
123+
return "";
124+
});
125+
}
126+
127+
protected override doPublishResponse(
128+
_requesterUid: ServerId,
129+
response: ClusterResponse
130+
) {
131+
return this.client.publish(response);
132+
}
133+
}

0 commit comments

Comments
 (0)