2323import Glibc
2424#elseif canImport(Android)
2525import Android
26+ import posix_filesystem. sys_epoll
2627#elseif canImport(Musl)
2728import Musl
2829#endif
@@ -40,7 +41,7 @@ final class AsyncIO: Sendable {
4041
4142 typealias OutputStream = AsyncThrowingStream < AsyncBufferSequence . Buffer , any Error >
4243
43- private final class MonitorThreadContext {
44+ private struct MonitorThreadContext : Sendable {
4445 let epollFileDescriptor : CInt
4546 let shutdownFileDescriptor : CInt
4647
@@ -95,7 +96,7 @@ final class AsyncIO: Sendable {
9596 events: EPOLLIN . rawValue,
9697 data: epoll_data ( fd: shutdownFileDescriptor)
9798 )
98- var rc = epoll_ctl (
99+ let rc = epoll_ctl (
99100 epollFileDescriptor,
100101 EPOLL_CTL_ADD,
101102 shutdownFileDescriptor,
@@ -117,76 +118,71 @@ final class AsyncIO: Sendable {
117118 epollFileDescriptor: epollFileDescriptor,
118119 shutdownFileDescriptor: shutdownFileDescriptor
119120 )
120- let threadContext = Unmanaged . passRetained ( context)
121- var thread : pthread_t = pthread_t ( )
122- rc = pthread_create ( & thread, nil , { args in
123- func reportError( _ error: SubprocessError ) {
124- _registration. withLock { store in
125- for continuation in store. values {
126- continuation. finish ( throwing: error)
121+ let thread : pthread_t
122+ do {
123+ thread = try pthread_create {
124+ func reportError( _ error: SubprocessError ) {
125+ _registration. withLock { store in
126+ for continuation in store. values {
127+ continuation. finish ( throwing: error)
128+ }
127129 }
128130 }
129- }
130-
131- let unmanaged = Unmanaged< MonitorThreadContext> . fromOpaque( args!)
132- let context = unmanaged. takeRetainedValue ( )
133-
134- var events : [ epoll_event ] = Array (
135- repeating: epoll_event ( events: 0 , data: epoll_data ( fd: 0 ) ) ,
136- count: _epollEventSize
137- )
138131
139- // Enter the monitor loop
140- monitorLoop: while true {
141- let eventCount = epoll_wait (
142- context. epollFileDescriptor,
143- & events,
144- CInt ( events. count) ,
145- - 1
132+ var events : [ epoll_event ] = Array (
133+ repeating: epoll_event ( events: 0 , data: epoll_data ( fd: 0 ) ) ,
134+ count: _epollEventSize
146135 )
147- if eventCount < 0 {
148- if errno == EINTR || errno == EAGAIN {
149- continue // interrupted by signal; try again
150- }
151- // Report other errors
152- let error = SubprocessError (
153- code: . init( . asyncIOFailed(
154- " epoll_wait failed " )
155- ) ,
156- underlyingError: . init( rawValue: errno)
157- )
158- reportError ( error)
159- break monitorLoop
160- }
161136
162- for index in 0 ..< Int ( eventCount) {
163- let event = events [ index]
164- let targetFileDescriptor = event. data. fd
165- // Breakout the monitor loop if we received shutdown
166- // from the shutdownFD
167- if targetFileDescriptor == context. shutdownFileDescriptor {
168- var buf : UInt64 = 0
169- _ = _SubprocessCShims. read ( context. shutdownFileDescriptor, & buf, MemoryLayout< UInt64> . size)
137+ // Enter the monitor loop
138+ monitorLoop: while true {
139+ let eventCount = epoll_wait (
140+ context. epollFileDescriptor,
141+ & events,
142+ CInt ( events. count) ,
143+ - 1
144+ )
145+ if eventCount < 0 {
146+ if errno == EINTR || errno == EAGAIN {
147+ continue // interrupted by signal; try again
148+ }
149+ // Report other errors
150+ let error = SubprocessError (
151+ code: . init( . asyncIOFailed(
152+ " epoll_wait failed " )
153+ ) ,
154+ underlyingError: . init( rawValue: errno)
155+ )
156+ reportError ( error)
170157 break monitorLoop
171158 }
172159
173- // Notify the continuation
174- let continuation = _registration. withLock { store -> SignalStream . Continuation ? in
175- if let continuation = store [ targetFileDescriptor] {
176- return continuation
160+ for index in 0 ..< Int ( eventCount) {
161+ let event = events [ index]
162+ let targetFileDescriptor = event. data. fd
163+ // Breakout the monitor loop if we received shutdown
164+ // from the shutdownFD
165+ if targetFileDescriptor == context. shutdownFileDescriptor {
166+ var buf : UInt64 = 0
167+ _ = _subprocess_read ( context. shutdownFileDescriptor, & buf, MemoryLayout< UInt64> . size)
168+ break monitorLoop
177169 }
178- return nil
170+
171+ // Notify the continuation
172+ let continuation = _registration. withLock { store -> SignalStream . Continuation ? in
173+ if let continuation = store [ targetFileDescriptor] {
174+ return continuation
175+ }
176+ return nil
177+ }
178+ continuation? . yield ( true )
179179 }
180- continuation? . yield ( true )
181180 }
182181 }
183-
184- return nil
185- } , threadContext. toOpaque ( ) )
186- guard rc == 0 else {
182+ } catch let underlyingError {
187183 let error = SubprocessError (
188184 code: . init( . asyncIOFailed( " Failed to create monitor thread " ) ) ,
189- underlyingError: . init ( rawValue : rc )
185+ underlyingError: underlyingError
190186 )
191187 self . state = . failure( error)
192188 return
@@ -211,14 +207,14 @@ final class AsyncIO: Sendable {
211207
212208 var one : UInt64 = 1
213209 // Wake up the thread for shutdown
214- _ = _SubprocessCShims . write ( currentState. shutdownFileDescriptor, & one, MemoryLayout< UInt64> . stride)
210+ _ = _subprocess_write ( currentState. shutdownFileDescriptor, & one, MemoryLayout< UInt64> . stride)
215211 // Cleanup the monitor thread
216212 pthread_join ( currentState. monitorThread, nil )
217213 var closeError : CInt = 0
218- if _SubprocessCShims . close ( currentState. epollFileDescriptor) != 0 {
214+ if _subprocess_close ( currentState. epollFileDescriptor) != 0 {
219215 closeError = errno
220216 }
221- if _SubprocessCShims . close ( currentState. shutdownFileDescriptor) != 0 {
217+ if _subprocess_close ( currentState. shutdownFileDescriptor) != 0 {
222218 closeError = errno
223219 }
224220 if closeError != 0 {
@@ -231,7 +227,7 @@ final class AsyncIO: Sendable {
231227 _ fileDescriptor: FileDescriptor ,
232228 for event: Event
233229 ) -> SignalStream {
234- return SignalStream { continuation in
230+ return SignalStream { ( continuation: SignalStream . Continuation ) -> ( ) in
235231 // If setup failed, nothing much we can do
236232 switch self . state {
237233 case . success( let state) :
@@ -261,9 +257,9 @@ final class AsyncIO: Sendable {
261257 let targetEvent : EPOLL_EVENTS
262258 switch event {
263259 case . read:
264- targetEvent = EPOLLIN
260+ targetEvent = EPOLL_EVENTS ( EPOLLIN)
265261 case . write:
266- targetEvent = EPOLLOUT
262+ targetEvent = EPOLL_EVENTS ( EPOLLOUT)
267263 }
268264
269265 var event = epoll_event (
@@ -369,7 +365,7 @@ extension AsyncIO {
369365 let offsetAddress = bufferPointer. baseAddress!. advanced ( by: readLength)
370366
371367 // Read directly into the buffer at the offset
372- return _SubprocessCShims . read ( fileDescriptor. rawValue, offsetAddress, targetCount)
368+ return _subprocess_read ( fileDescriptor. rawValue, offsetAddress, targetCount)
373369 }
374370 if bytesRead > 0 {
375371 // Read some data
@@ -435,7 +431,7 @@ extension AsyncIO {
435431 let written = bytes. withUnsafeBytes { ptr in
436432 let remainingLength = ptr. count - writtenLength
437433 let startPtr = ptr. baseAddress!. advanced ( by: writtenLength)
438- return _SubprocessCShims . write ( fileDescriptor. rawValue, startPtr, remainingLength)
434+ return _subprocess_write ( fileDescriptor. rawValue, startPtr, remainingLength)
439435 }
440436 if written > 0 {
441437 writtenLength += written
@@ -478,7 +474,7 @@ extension AsyncIO {
478474 let written = span. withUnsafeBytes { ptr in
479475 let remainingLength = ptr. count - writtenLength
480476 let startPtr = ptr. baseAddress!. advanced ( by: writtenLength)
481- return _SubprocessCShims . write ( fileDescriptor. rawValue, startPtr, remainingLength)
477+ return _subprocess_write ( fileDescriptor. rawValue, startPtr, remainingLength)
482478 }
483479 if written > 0 {
484480 writtenLength += written
0 commit comments