1010using System . Collections . Generic ;
1111using System . Linq ;
1212using System . Text ;
13+ using System . Threading ;
1314using System . Threading . Tasks ;
1415using Xunit ;
1516
@@ -20,7 +21,7 @@ internal class OutputReader
2021 private AsyncQueue < OutputEventBody > outputQueue = new AsyncQueue < OutputEventBody > ( ) ;
2122
2223 private string currentOutputCategory ;
23- private Queue < string > bufferedOutput = new Queue < string > ( ) ;
24+ private Queue < Tuple < string , bool > > bufferedOutput = new Queue < Tuple < string , bool > > ( ) ;
2425
2526 public OutputReader ( ProtocolEndpoint protocolClient )
2627 {
@@ -31,34 +32,80 @@ public OutputReader(ProtocolEndpoint protocolClient)
3132
3233 public async Task < string > ReadLine ( string expectedOutputCategory = "stdout" )
3334 {
34- if ( this . bufferedOutput . Count > 0 )
35+ try
3536 {
36- Assert . Equal ( expectedOutputCategory , this . currentOutputCategory ) ;
37+ bool lineHasNewLine = false ;
38+ string [ ] outputLines = null ;
39+ string nextOutputString = string . Empty ;
3740
38- return this . bufferedOutput . Dequeue ( ) ;
39- }
40-
41- // Execution reaches this point if a buffered line wasn't available
42- OutputEventBody nextOutputEvent = await this . outputQueue . DequeueAsync ( ) ;
41+ // Wait no longer than 7 seconds for output to come back
42+ CancellationTokenSource cancellationSource = new CancellationTokenSource ( 7000 ) ;
4343
44- Assert . Equal ( expectedOutputCategory , nextOutputEvent . Category ) ;
45- this . currentOutputCategory = nextOutputEvent . Category ;
44+ // Any lines in the buffer?
45+ if ( this . bufferedOutput . Count > 0 )
46+ {
47+ Assert . Equal ( expectedOutputCategory , this . currentOutputCategory ) ;
4648
47- string [ ] outputLines =
48- nextOutputEvent . Output . Split (
49- new string [ ] { "\n " , "\r \n " } ,
50- StringSplitOptions . None ) ;
49+ // Return the first buffered line
50+ var lineTuple = this . bufferedOutput . Dequeue ( ) ;
51+ nextOutputString = lineTuple . Item1 ;
52+ lineHasNewLine = lineTuple . Item2 ;
53+ }
5154
52- // Buffer remaining lines
53- if ( outputLines . Length > 1 )
54- {
55- for ( int i = 1 ; i < outputLines . Length ; i ++ )
55+ // Loop until we get a full line of output
56+ while ( ! lineHasNewLine )
5657 {
57- this . bufferedOutput . Enqueue ( outputLines [ i ] ) ;
58+ // Execution reaches this point if a buffered line wasn't available
59+ Task < OutputEventBody > outputTask =
60+ this . outputQueue . DequeueAsync (
61+ cancellationSource . Token ) ;
62+ OutputEventBody nextOutputEvent = await outputTask ;
63+
64+ // Verify that the output is of the expected type
65+ Assert . Equal ( expectedOutputCategory , nextOutputEvent . Category ) ;
66+ this . currentOutputCategory = nextOutputEvent . Category ;
67+
68+ // Split up the output into multiple lines
69+ outputLines =
70+ nextOutputEvent . Output . Split (
71+ new string [ ] { "\n " , "\r \n " } ,
72+ StringSplitOptions . None ) ;
73+
74+ // Add the first bit of output to the existing string
75+ nextOutputString += outputLines [ 0 ] ;
76+
77+ // Have we found a newline now?
78+ lineHasNewLine =
79+ outputLines . Length > 1 ||
80+ nextOutputEvent . Output . EndsWith ( "\n " ) ;
81+
82+ // Buffer any remaining lines for future reads
83+ if ( outputLines . Length > 1 )
84+ {
85+ for ( int i = 1 ; i < outputLines . Length ; i ++ )
86+ {
87+ this . bufferedOutput . Enqueue (
88+ new Tuple < string , bool > (
89+ outputLines [ i ] ,
90+
91+ // The line has a newline if it's not the last segment or
92+ // if the current output string ends with a newline
93+ i < outputLines . Length - 1 ||
94+ nextOutputEvent . Output . EndsWith ( "\n " ) ) ) ;
95+ }
96+ }
97+
98+ // At this point, the state of lineHasNewLine will determine
99+ // whether the loop continues to wait for another output
100+ // event that completes the current line.
58101 }
59- }
60102
61- return outputLines [ 0 ] ;
103+ return nextOutputString ;
104+ }
105+ catch ( TaskCanceledException )
106+ {
107+ throw new TimeoutException ( "Timed out waiting for an input line." ) ;
108+ }
62109 }
63110
64111 public async Task < string [ ] > ReadLines ( int lineCount , string expectedOutputCategory = "stdout" )
0 commit comments