1- import pg , { PoolConfig } from 'pg'
2- import { DatabaseError } from 'pg-protocol'
1+ import pg from 'pg'
32import { parse as parseArray } from 'postgres-array'
4- import { PostgresMetaResult } from './types.js'
3+ import { PostgresMetaResult , PoolConfig } from './types.js'
54
65pg . types . setTypeParser ( pg . types . builtins . INT8 , ( x ) => {
76 const asNumber = Number ( x )
@@ -21,6 +20,42 @@ pg.types.setTypeParser(1185, parseArray) // _timestamptz
2120pg . types . setTypeParser ( 600 , ( x ) => x ) // point
2221pg . types . setTypeParser ( 1017 , ( x ) => x ) // _point
2322
23+ // Ensure any query will have an appropriate error handler on the pool to prevent connections errors
24+ // to bubble up all the stack eventually killing the server
25+ const poolerQueryHandleError = ( pgpool : pg . Pool , sql : string ) : Promise < pg . QueryResult < any > > => {
26+ return new Promise ( ( resolve , reject ) => {
27+ let rejected = false
28+ const connectionErrorHandler = ( err : any ) => {
29+ // If the error hasn't already be propagated to the catch
30+ if ( ! rejected ) {
31+ // This is a trick to wait for the next tick, leaving a chance for handled errors such as
32+ // RESULT_SIZE_LIMIT to take over other stream errors such as `unexpected commandComplete message`
33+ setTimeout ( ( ) => {
34+ rejected = true
35+ return reject ( err )
36+ } )
37+ }
38+ }
39+ // This listened avoid getting uncaught exceptions for errors happening at connection level within the stream
40+ // such as parse or RESULT_SIZE_EXCEEDED errors instead, handle the error gracefully by bubbling in up to the caller
41+ pgpool . once ( 'error' , connectionErrorHandler )
42+ pgpool
43+ . query ( sql )
44+ . then ( ( results : pg . QueryResult < any > ) => {
45+ if ( ! rejected ) {
46+ return resolve ( results )
47+ }
48+ } )
49+ . catch ( ( err : any ) => {
50+ // If the error hasn't already be handled within the error listener
51+ if ( ! rejected ) {
52+ rejected = true
53+ return reject ( err )
54+ }
55+ } )
56+ } )
57+ }
58+
2459export const init : ( config : PoolConfig ) => {
2560 query : ( sql : string ) => Promise < PostgresMetaResult < any > >
2661 end : ( ) => Promise < void >
@@ -60,26 +95,27 @@ export const init: (config: PoolConfig) => {
6095 // compromise: if we run `query` after `pool.end()` is called (i.e. pool is
6196 // `null`), we temporarily create a pool and close it right after.
6297 let pool : pg . Pool | null = new pg . Pool ( config )
98+
6399 return {
64100 async query ( sql ) {
65101 try {
66102 if ( ! pool ) {
67103 const pool = new pg . Pool ( config )
68- let res = await pool . query ( sql )
104+ let res = await poolerQueryHandleError ( pool , sql )
69105 if ( Array . isArray ( res ) ) {
70106 res = res . reverse ( ) . find ( ( x ) => x . rows . length !== 0 ) ?? { rows : [ ] }
71107 }
72108 await pool . end ( )
73109 return { data : res . rows , error : null }
74110 }
75111
76- let res = await pool . query ( sql )
112+ let res = await poolerQueryHandleError ( pool , sql )
77113 if ( Array . isArray ( res ) ) {
78114 res = res . reverse ( ) . find ( ( x ) => x . rows . length !== 0 ) ?? { rows : [ ] }
79115 }
80116 return { data : res . rows , error : null }
81117 } catch ( error : any ) {
82- if ( error instanceof DatabaseError ) {
118+ if ( error . constructor . name === ' DatabaseError' ) {
83119 // Roughly based on:
84120 // - https://github.com/postgres/postgres/blob/fc4089f3c65a5f1b413a3299ba02b66a8e5e37d0/src/interfaces/libpq/fe-protocol3.c#L1018
85121 // - https://github.com/brianc/node-postgres/blob/b1a8947738ce0af004cb926f79829bb2abc64aa6/packages/pg/lib/native/query.js#L33
@@ -146,17 +182,42 @@ ${' '.repeat(5 + lineNumber.toString().length + 2 + lineOffset)}^
146182 } ,
147183 }
148184 }
149-
150- return { data : null , error : { message : error . message } }
185+ try {
186+ // Handle stream errors and result size exceeded errors
187+ if ( error . code === 'RESULT_SIZE_EXCEEDED' ) {
188+ // Force kill the connection without waiting for graceful shutdown
189+ return {
190+ data : null ,
191+ error : {
192+ message : `Query result size (${ error . resultSize } bytes) exceeded the configured limit (${ error . maxResultSize } bytes)` ,
193+ code : error . code ,
194+ resultSize : error . resultSize ,
195+ maxResultSize : error . maxResultSize ,
196+ } ,
197+ }
198+ }
199+ return { data : null , error : { code : error . code , message : error . message } }
200+ } finally {
201+ // If the error isn't a "DatabaseError" assume it's a connection related we kill the connection
202+ // To attempt a clean reconnect on next try
203+ await this . end ( )
204+ }
151205 }
152206 } ,
153207
154208 async end ( ) {
155- const _pool = pool
156- pool = null
157- // Gracefully wait for active connections to be idle, then close all
158- // connections in the pool.
159- if ( _pool ) await _pool . end ( )
209+ try {
210+ const _pool = pool
211+ pool = null
212+ // Gracefully wait for active connections to be idle, then close all
213+ // connections in the pool.
214+ if ( _pool ) {
215+ await _pool . end ( )
216+ }
217+ } catch ( endError ) {
218+ // Ignore any errors during cleanup just log them
219+ console . error ( 'Failed ending connection pool' , endError )
220+ }
160221 } ,
161222 }
162223}
0 commit comments