-
Notifications
You must be signed in to change notification settings - Fork 1.3k
CSHARP-5777: Avoid ThreadPool-dependent IO methods in sync API #1805
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: v3.5.x
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,41 +38,12 @@ public static void EfficientCopyTo(this Stream input, Stream output) | |
|
|
||
| public static int Read(this Stream stream, byte[] buffer, int offset, int count, TimeSpan timeout, CancellationToken cancellationToken) | ||
| { | ||
| try | ||
| { | ||
| using var manualResetEvent = new ManualResetEventSlim(); | ||
| var readOperation = stream.BeginRead( | ||
| buffer, | ||
| offset, | ||
| count, | ||
| state => ((ManualResetEventSlim)state.AsyncState).Set(), | ||
| manualResetEvent); | ||
|
|
||
| if (readOperation.IsCompleted || manualResetEvent.Wait(timeout, cancellationToken)) | ||
| { | ||
| return stream.EndRead(readOperation); | ||
| } | ||
| } | ||
| catch (OperationCanceledException) | ||
| { | ||
| // Have to suppress OperationCanceledException here, it will be thrown after the stream will be disposed. | ||
| } | ||
| catch (ObjectDisposedException) | ||
| { | ||
| throw new IOException(); | ||
| } | ||
|
|
||
| try | ||
| { | ||
| stream.Dispose(); | ||
| } | ||
| catch | ||
| { | ||
| // Ignore any exceptions | ||
| } | ||
|
|
||
| cancellationToken.ThrowIfCancellationRequested(); | ||
| throw new TimeoutException(); | ||
| return UseStreamWithTimeout( | ||
| stream, | ||
| (str, state) => str.Read(state.buffer, state.offset, state.count), | ||
| (buffer, offset, count), | ||
| timeout, | ||
| cancellationToken); | ||
| } | ||
|
|
||
| public static async Task<int> ReadAsync(this Stream stream, byte[] buffer, int offset, int count, TimeSpan timeout, CancellationToken cancellationToken) | ||
|
|
@@ -219,43 +190,16 @@ public static async Task ReadBytesAsync(this Stream stream, byte[] destination, | |
|
|
||
| public static void Write(this Stream stream, byte[] buffer, int offset, int count, TimeSpan timeout, CancellationToken cancellationToken) | ||
| { | ||
| try | ||
| { | ||
| using var manualResetEvent = new ManualResetEventSlim(); | ||
| var writeOperation = stream.BeginWrite( | ||
| buffer, | ||
| offset, | ||
| count, | ||
| state => ((ManualResetEventSlim)state.AsyncState).Set(), | ||
| manualResetEvent); | ||
|
|
||
| if (writeOperation.IsCompleted || manualResetEvent.Wait(timeout, cancellationToken)) | ||
| UseStreamWithTimeout( | ||
| stream, | ||
| (str, state) => | ||
| { | ||
| stream.EndWrite(writeOperation); | ||
| return; | ||
| } | ||
| } | ||
| catch (OperationCanceledException) | ||
| { | ||
| // Have to suppress OperationCanceledException here, it will be thrown after the stream will be disposed. | ||
| } | ||
| catch (ObjectDisposedException) | ||
| { | ||
| // It's possible to get ObjectDisposedException when the connection pool was closed with interruptInUseConnections set to true. | ||
| throw new IOException(); | ||
| } | ||
|
|
||
| try | ||
| { | ||
| stream.Dispose(); | ||
| } | ||
| catch | ||
| { | ||
| // Ignore any exceptions | ||
| } | ||
|
|
||
| cancellationToken.ThrowIfCancellationRequested(); | ||
| throw new TimeoutException(); | ||
| str.Write(state.buffer, state.offset, state.count); | ||
| return true; | ||
| }, | ||
| (buffer, offset, count), | ||
| timeout, | ||
| cancellationToken); | ||
| } | ||
|
|
||
| public static async Task WriteAsync(this Stream stream, byte[] buffer, int offset, int count, TimeSpan timeout, CancellationToken cancellationToken) | ||
|
|
@@ -325,5 +269,89 @@ public static async Task WriteBytesAsync(this Stream stream, OperationContext op | |
| count -= bytesToWrite; | ||
| } | ||
| } | ||
|
|
||
| private static TResult UseStreamWithTimeout<TResult, TState>(Stream stream, Func<Stream, TState, TResult> method, TState state, TimeSpan timeout, CancellationToken cancellationToken) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe just |
||
| { | ||
| StreamDisposeCallbackState callbackState = null; | ||
| Timer timer = null; | ||
| CancellationTokenRegistration cancellationSubscription = default; | ||
| if (timeout != Timeout.InfiniteTimeSpan) | ||
| { | ||
| callbackState = new StreamDisposeCallbackState(stream); | ||
| timer = new Timer(DisposeStreamCallback, callbackState, timeout, Timeout.InfiniteTimeSpan); | ||
| } | ||
|
|
||
| if (cancellationToken.CanBeCanceled) | ||
| { | ||
| callbackState ??= new StreamDisposeCallbackState(stream); | ||
| cancellationSubscription = cancellationToken.Register(DisposeStreamCallback, callbackState); | ||
| } | ||
|
|
||
| try | ||
| { | ||
| var result = method(stream, state); | ||
| if (callbackState?.TryChangeState(OperationState.Done) == false) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe |
||
| { | ||
| // if cannot change the state - then the stream was/will be disposed, throw here | ||
| throw new IOException(); | ||
| } | ||
|
|
||
| return result; | ||
| } | ||
| catch (IOException) | ||
| { | ||
| if (callbackState?.OperationState == OperationState.Cancelled) | ||
| { | ||
| cancellationToken.ThrowIfCancellationRequested(); | ||
| throw new TimeoutException(); | ||
| } | ||
|
|
||
| throw; | ||
| } | ||
| finally | ||
| { | ||
| timer?.Dispose(); | ||
| cancellationSubscription.Dispose(); | ||
| } | ||
|
|
||
| static void DisposeStreamCallback(object state) | ||
| { | ||
| var disposeCallbackState = (StreamDisposeCallbackState)state; | ||
| if (!disposeCallbackState.TryChangeState(OperationState.Cancelled)) | ||
| { | ||
| // if cannot change the state - then I/O was already succeeded | ||
| return; | ||
| } | ||
|
|
||
| try | ||
| { | ||
| disposeCallbackState.Stream.Dispose(); | ||
| } | ||
| catch (Exception) | ||
| { | ||
| // callbacks should not fail, suppress any exceptions here | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private record StreamDisposeCallbackState(Stream Stream) | ||
| { | ||
| private int _operationState = 0; | ||
|
|
||
| public OperationState OperationState | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. public OperationState OperationState => (OperationState)_operationState; |
||
| { | ||
| get => (OperationState)_operationState; | ||
| } | ||
|
|
||
| public bool TryChangeState(OperationState newState) => | ||
| Interlocked.CompareExchange(ref _operationState, (int)newState, (int)OperationState.InProgress) == (int)OperationState.InProgress; | ||
| } | ||
|
|
||
| private enum OperationState | ||
| { | ||
| InProgress = 0, | ||
| Done, | ||
| Cancelled, | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few naming suggestions:
UseStreamWithTimeout-> rename toExecuteOperationWithTimeout?method-> rename tooperation?