@@ -88,7 +88,27 @@ struct ConnectionStateMachine {
8888 case sendParseDescribeBindExecuteSync( PostgresQuery )
8989 case sendBindExecuteSync( PSQLExecuteStatement )
9090 case failQuery( EventLoopPromise < PSQLRowStream > , with: PSQLError , cleanupContext: CleanUpContext ? )
91+ /// Fail a query's execution by resuming the continuation with the given error. When `sync` is `true`, send a
92+ /// `Sync` message to the backend.
93+ case failQueryContinuation( AnyErrorContinuation , with: PSQLError , sync: Bool , cleanupContext: CleanUpContext ? )
94+ /// Fail a query's execution by resuming the continuation with the given error and send a `Sync` message to the
95+ /// backend.
9196 case succeedQuery( EventLoopPromise < PSQLRowStream > , with: QueryResult )
97+ /// Succeed the continuation with a void result. When `sync` is `true`, send a `Sync` message to the backend.
98+ case succeedQueryContinuation( CheckedContinuation < Void , any Error > , sync: Bool )
99+
100+ /// Trigger a data transfer returning a `PostgresCopyFromWriter` to the given continuation.
101+ ///
102+ /// Once the data transfer is triggered, it will send `CopyData` messages to the backend. After that the state
103+ /// machine needs to be prodded again to send a `CopyDone` or `CopyFail` by calling
104+ /// `PostgresChannelHandler.sendCopyDone` or `PostgresChannelHandler.sendCopyFail`.
105+ case triggerCopyData( CheckedContinuation < PostgresCopyFromWriter , any Error > )
106+
107+ /// Send a `CopyDone` and `Sync` message to the backend.
108+ case sendCopyDoneAndSync
109+
110+ /// Send a `CopyFail` message to the backend with the given error message.
111+ case sendCopyFail( message: String )
92112
93113 // --- streaming actions
94114 // actions if query has requested next row but we are waiting for backend
@@ -107,6 +127,14 @@ struct ConnectionStateMachine {
107127 case failClose( CloseCommandContext , with: PSQLError , cleanupContext: CleanUpContext ? )
108128 }
109129
130+ enum ChannelWritabilityChangedAction {
131+ /// No action needs to be taken based on the writability change.
132+ case none
133+
134+ /// Resume the given continuation successfully.
135+ case succeedPromise( EventLoopPromise < Void > )
136+ }
137+
110138 private var state : State
111139 private let requireBackendKeyData : Bool
112140 private var taskQueue = CircularBuffer < PSQLTask > ( )
@@ -587,6 +615,8 @@ struct ConnectionStateMachine {
587615 switch queryContext. query {
588616 case . executeStatement( _, let promise) , . unnamed( _, let promise) :
589617 return . failQuery( promise, with: psqlErrror, cleanupContext: nil )
618+ case . copyFrom( _, let triggerCopy) :
619+ return . failQueryContinuation( . copyFromWriter( triggerCopy) , with: psqlErrror, sync: false , cleanupContext: nil )
590620 case . prepareStatement( _, _, _, let promise) :
591621 return . failPreparedStatementCreation( promise, with: psqlErrror, cleanupContext: nil )
592622 }
@@ -660,6 +690,16 @@ struct ConnectionStateMachine {
660690 preconditionFailure ( " Invalid state: \( self . state) " )
661691 }
662692 }
693+
694+ mutating func channelWritabilityChanged( isWritable: Bool ) -> ChannelWritabilityChangedAction {
695+ guard case . extendedQuery( var queryState, let connectionContext) = state else {
696+ return . none
697+ }
698+ self . state = . modifying // avoid CoW
699+ let action = queryState. channelWritabilityChanged ( isWritable: isWritable)
700+ self . state = . extendedQuery( queryState, connectionContext)
701+ return action
702+ }
663703
664704 // MARK: - Running Queries -
665705
@@ -752,10 +792,55 @@ struct ConnectionStateMachine {
752792 return self . modify ( with: action)
753793 }
754794
755- mutating func copyInResponseReceived(
756- _ copyInResponse: PostgresBackendMessage . CopyInResponse
757- ) -> ConnectionAction {
758- return self . closeConnectionAndCleanup ( . unexpectedBackendMessage( . copyInResponse( copyInResponse) ) )
795+ mutating func copyInResponseReceived( _ copyInResponse: PostgresBackendMessage . CopyInResponse ) -> ConnectionAction {
796+ guard case . extendedQuery( var queryState, let connectionContext) = self . state, !queryState. isComplete else {
797+ return self . closeConnectionAndCleanup ( . unexpectedBackendMessage( . copyInResponse( copyInResponse) ) )
798+ }
799+
800+ self . state = . modifying // avoid CoW
801+ let action = queryState. copyInResponseReceived ( copyInResponse)
802+ self . state = . extendedQuery( queryState, connectionContext)
803+ return self . modify ( with: action)
804+ }
805+
806+
807+ /// Succeed the promise when the channel to the backend is writable and the backend is ready to receive more data.
808+ ///
809+ /// The promise may be failed if the backend indicated that it can't handle any more data by sending an
810+ /// `ErrorResponse`. This is mostly the case when malformed data is sent to it. In that case, the data transfer
811+ /// should be aborted to avoid unnecessary work.
812+ mutating func checkBackendCanReceiveCopyData( channelIsWritable: Bool , promise: EventLoopPromise < Void > ) {
813+ guard case . extendedQuery( var queryState, let connectionContext) = self . state else {
814+ preconditionFailure ( " Copy mode is only supported for extended queries " )
815+ }
816+
817+ self . state = . modifying // avoid CoW
818+ queryState. checkBackendCanReceiveCopyData ( channelIsWritable: channelIsWritable, promise: promise)
819+ self . state = . extendedQuery( queryState, connectionContext)
820+ }
821+
822+ /// Put the state machine out of the copying mode and send a `CopyDone` message to the backend.
823+ mutating func sendCopyDone( continuation: CheckedContinuation < Void , any Error > ) -> ConnectionAction {
824+ guard case . extendedQuery( var queryState, let connectionContext) = self . state else {
825+ preconditionFailure ( " Copy mode is only supported for extended queries " )
826+ }
827+
828+ self . state = . modifying // avoid CoW
829+ let action = queryState. sendCopyDone ( continuation: continuation)
830+ self . state = . extendedQuery( queryState, connectionContext)
831+ return self . modify ( with: action)
832+ }
833+
834+ /// Put the state machine out of the copying mode and send a `CopyFail` message to the backend.
835+ mutating func sendCopyFail( message: String , continuation: CheckedContinuation < Void , any Error > ) -> ConnectionAction {
836+ guard case . extendedQuery( var queryState, let connectionContext) = self . state else {
837+ preconditionFailure ( " Copy mode is only supported for extended queries " )
838+ }
839+
840+ self . state = . modifying // avoid CoW
841+ let action = queryState. sendCopyFail ( message: message, continuation: continuation)
842+ self . state = . extendedQuery( queryState, connectionContext)
843+ return self . modify ( with: action)
759844 }
760845
761846 mutating func emptyQueryResponseReceived( ) -> ConnectionAction {
@@ -866,14 +951,21 @@ struct ConnectionStateMachine {
866951 . forwardRows,
867952 . forwardStreamComplete,
868953 . wait,
869- . read:
954+ . read,
955+ . triggerCopyData,
956+ . sendCopyDoneAndSync,
957+ . sendCopyFail,
958+ . succeedQueryContinuation:
870959 preconditionFailure ( " Invalid query state machine action in state: \( self . state) , action: \( action) " )
871960
872961 case . evaluateErrorAtConnectionLevel:
873962 return . closeConnectionAndCleanup( cleanupContext)
874963
875- case . failQuery( let queryContext, with: let error) :
876- return . failQuery( queryContext, with: error, cleanupContext: cleanupContext)
964+ case . failQuery( let promise, with: let error) :
965+ return . failQuery( promise, with: error, cleanupContext: cleanupContext)
966+
967+ case . failQueryContinuation( let continuation, with: let error, let sync) :
968+ return . failQueryContinuation( continuation, with: error, sync: sync, cleanupContext: cleanupContext)
877969
878970 case . forwardStreamError( let error, let read) :
879971 return . forwardStreamError( error, read: read, cleanupContext: cleanupContext)
@@ -1044,8 +1136,19 @@ extension ConnectionStateMachine {
10441136 case . failQuery( let requestContext, with: let error) :
10451137 let cleanupContext = self . setErrorAndCreateCleanupContextIfNeeded ( error)
10461138 return . failQuery( requestContext, with: error, cleanupContext: cleanupContext)
1139+ case . failQueryContinuation( let continuation, with: let error, let sync) :
1140+ let cleanupContext = self . setErrorAndCreateCleanupContextIfNeeded ( error)
1141+ return . failQueryContinuation( continuation, with: error, sync: sync, cleanupContext: cleanupContext)
10471142 case . succeedQuery( let requestContext, with: let result) :
10481143 return . succeedQuery( requestContext, with: result)
1144+ case . succeedQueryContinuation( let continuation, let sync) :
1145+ return . succeedQueryContinuation( continuation, sync: sync)
1146+ case . triggerCopyData( let triggerCopy) :
1147+ return . triggerCopyData( triggerCopy)
1148+ case . sendCopyDoneAndSync:
1149+ return . sendCopyDoneAndSync
1150+ case . sendCopyFail( message: let message) :
1151+ return . sendCopyFail( message: message)
10491152 case . forwardRows( let buffer) :
10501153 return . forwardRows( buffer)
10511154 case . forwardStreamComplete( let buffer, let commandTag) :
0 commit comments