1+ import NIO
2+
3+ #if compiler(>=6.0)
4+ /// Handle to send binary data for a `COPY ... FROM STDIN` query to the backend.
5+ ///
6+ /// It takes care of serializing `PostgresEncodable` column types into the binary format that Postgres expects.
7+ public struct PostgresBinaryCopyFromWriter : ~ Copyable {
8+ /// Handle to serialize columns into a row that is being written by `PostgresBinaryCopyFromWriter`.
9+ public struct ColumnWriter : ~ Copyable {
10+ /// The `PostgresBinaryCopyFromWriter` that is gathering the serialized data.
11+ ///
12+ /// We need to model this as `UnsafeMutablePointer` because we can't express in the Swift type system that
13+ /// `ColumnWriter` never exceeds the lifetime of `PostgresBinaryCopyFromWriter`.
14+ @usableFromInline
15+ let underlying : UnsafeMutablePointer < PostgresBinaryCopyFromWriter >
16+
17+ /// The number of columns that have been written by this `ColumnWriter`.
18+ @usableFromInline
19+ var columns : UInt16 = 0
20+
21+ @usableFromInline
22+ init ( underlying: UnsafeMutablePointer < PostgresBinaryCopyFromWriter > ) {
23+ self . underlying = underlying
24+ }
25+
26+ /// Serialize a single column to a row.
27+ ///
28+ /// - Important: It is critical that that data type encoded here exactly matches the data type in the
29+ /// databasse. For example, if the database stores an a 4-bit integer the corresponding `writeColumn` must
30+ /// be called with an `Int32`. Serializing an integer of a different width will cause a deserialization
31+ /// failure in the backend.
32+ @inlinable
33+ public mutating func writeColumn( _ column: ( some PostgresEncodable ) ? ) throws {
34+ columns += 1
35+ try underlying. pointee. writeColumn ( column)
36+ }
37+ }
38+
39+ /// The underlying `PostgresCopyFromWriter` that sends the serialized data to the backend.
40+ @usableFromInline let underlying : PostgresCopyFromWriter
41+
42+ /// The buffer in which we accumulate binary data. Once this buffer exceeds `bufferSize`, we flush it to
43+ /// the backend.
44+ @usableFromInline var buffer = ByteBuffer ( )
45+
46+ /// Once `buffer` exceeds this size, it gets flushed to the backend.
47+ @usableFromInline let bufferSize : Int
48+
49+ init ( underlying: PostgresCopyFromWriter , bufferSize: Int ) {
50+ self . underlying = underlying
51+ // Allocate 10% more than the buffer size because we only flush the buffer once it has exceeded `bufferSize`
52+ buffer. reserveCapacity ( bufferSize + bufferSize / 10 )
53+ self . bufferSize = bufferSize
54+ }
55+
56+ /// Serialize a single row to the backend. Call `writeColumn` on `columnWriter` for every column that should be
57+ /// included in the row.
58+ @inlinable
59+ public mutating func writeRow( _ body: ( _ columnWriter: inout ColumnWriter ) throws -> Void ) async throws {
60+ // Write a placeholder for the number of columns
61+ let columnIndex = buffer. writerIndex
62+ buffer. writeInteger ( UInt16 ( 0 ) )
63+
64+ let columns = try withUnsafeMutablePointer ( to: & self ) { pointerToSelf in
65+ // Important: We need to ensure that `pointerToSel` (and thus `ColumnWriter`) does not exceed the lifetime
66+ // of `self` because it is holding an unsafe reference to it.
67+ //
68+ // We achieve this because `ColumnWriter` is non-Copyable and thus the client can't store a copy to it.
69+ // Futhermore `columnWriter` is destroyed before the end of `withUnsafeMutablePointer`, which holds `self`
70+ // alive.
71+ var columnWriter = ColumnWriter ( underlying: pointerToSelf)
72+
73+ try body ( & columnWriter)
74+
75+ return columnWriter. columns
76+ }
77+
78+ // Fill in the number of columns
79+ buffer. setInteger ( columns, at: columnIndex)
80+
81+ if buffer. readableBytes > bufferSize {
82+ try await flush ( )
83+ }
84+ }
85+
86+ /// Serialize a single column to the buffer. Should only be called by `ColumnWriter`.
87+ @inlinable
88+ mutating func writeColumn( _ column: ( some PostgresEncodable ) ? ) throws {
89+ if let column {
90+ let sizeIndex = buffer. readableBytes
91+ buffer. writeInteger ( Int32 ( 0 ) )
92+ try column. encode ( into: & buffer, context: . default)
93+ buffer. setInteger ( Int32 ( buffer. readableBytes - sizeIndex - 4 ) , at: sizeIndex)
94+ } else {
95+ buffer. writeInteger ( Int32 ( - 1 ) )
96+ }
97+ }
98+
99+ /// Flush any pending data in the buffer to the backend.
100+ @usableFromInline
101+ mutating func flush( isolation: ( any Actor ) ? = #isolation) async throws {
102+ try await underlying. write ( buffer)
103+ buffer. clear ( )
104+ }
105+ }
106+ #endif
107+
1108/// Handle to send data for a `COPY ... FROM STDIN` query to the backend.
2109public struct PostgresCopyFromWriter : Sendable {
3110 private let channelHandler : NIOLoopBound < PostgresChannelHandler >
@@ -115,15 +222,25 @@ public struct PostgresCopyFromFormat: Sendable {
115222 public init ( ) { }
116223 }
117224
225+ /// Options that can be used to modify the `binary` format of a COPY operation.
226+ public struct BinaryOptions : Sendable {
227+ public init ( ) { }
228+ }
229+
118230 enum Format {
119231 case text( TextOptions )
232+ case binary( BinaryOptions )
120233 }
121234
122235 var format : Format
123236
124237 public static func text( _ options: TextOptions ) -> PostgresCopyFromFormat {
125238 return PostgresCopyFromFormat ( format: . text( options) )
126239 }
240+
241+ public static func binary( _ options: BinaryOptions ) -> PostgresCopyFromFormat {
242+ return PostgresCopyFromFormat ( format: . binary( options) )
243+ }
127244}
128245
129246#if compiler(>=6.0)
@@ -156,6 +273,8 @@ private func buildCopyFromQuery(
156273 // Set the delimiter as a Unicode code point. This avoids the possibility of SQL injection.
157274 queryOptions. append ( " DELIMITER U&' \\ \( String ( format: " %04x " , delimiter. value) ) ' " )
158275 }
276+ case . binary:
277+ queryOptions. append ( " FORMAT binary " )
159278 }
160279 precondition ( !queryOptions. isEmpty)
161280 query += " WITH ( "
@@ -165,6 +284,50 @@ private func buildCopyFromQuery(
165284}
166285
167286extension PostgresConnection {
287+ /// Copy data into a table using a `COPY <table name> FROM STDIN` query, transferring data in a binary format.
288+ ///
289+ /// - Parameters:
290+ /// - table: The name of the table into which to copy the data.
291+ /// - columns: The name of the columns to copy. If an empty array is passed, all columns are assumed to be copied.
292+ /// - bufferSize: How many bytes to accumulate a local buffer before flushing it to the database. Can affect
293+ /// performance characteristics of the copy operation.
294+ /// - writeData: Closure that produces the data for the table, to be streamed to the backend. Call `write` on the
295+ /// writer provided by the closure to send data to the backend and return from the closure once all data is sent.
296+ /// Throw an error from the closure to fail the data transfer. The error thrown by the closure will be rethrown
297+ /// by the `copyFrom` function.
298+ ///
299+ /// - Important: The table and column names are inserted into the `COPY FROM` query as passed and might thus be
300+ /// susceptible to SQL injection. Ensure no untrusted data is contained in these strings.
301+ public func copyFromBinary(
302+ table: String ,
303+ columns: [ String ] = [ ] ,
304+ options: PostgresCopyFromFormat . BinaryOptions = . init( ) ,
305+ bufferSize: Int = 100_000 ,
306+ logger: Logger ,
307+ isolation: isolated ( any Actor ) ? = #isolation,
308+ file: String = #fileID,
309+ line: Int = #line,
310+ writeData: @escaping @Sendable ( inout PostgresBinaryCopyFromWriter ) async throws -> Void
311+ ) async throws {
312+ try await copyFrom ( table: table, columns: columns, format: . binary( PostgresCopyFromFormat . BinaryOptions ( ) ) , logger: logger) { writer in
313+ var header = ByteBuffer ( )
314+ header. writeString ( " PGCOPY \n " )
315+ header. writeInteger ( UInt8 ( 0xff ) )
316+ header. writeString ( " \r \n \0 " )
317+
318+ // Flag fields
319+ header. writeInteger ( UInt32 ( 0 ) )
320+
321+ // Header extension area length
322+ header. writeInteger ( UInt32 ( 0 ) )
323+ try await writer. write ( header)
324+
325+ var binaryWriter = PostgresBinaryCopyFromWriter ( underlying: writer, bufferSize: bufferSize)
326+ try await writeData ( & binaryWriter)
327+ try await binaryWriter. flush ( )
328+ }
329+ }
330+
168331 /// Copy data into a table using a `COPY <table name> FROM STDIN` query.
169332 ///
170333 /// - Parameters:
0 commit comments