Skip to content

Commit a94ba78

Browse files
committed
[#2640] Update CancelSignalTest
I don't know if this test actually makes sense: the cancel opertion happens in a different thread that causes some error when calling session.close because the thread doesn't match the one the session was open. I'm going to keep it because it was introduced to solve #1436 but we might need to review it again in the future.
1 parent 0da651f commit a94ba78

File tree

1 file changed

+55
-49
lines changed

1 file changed

+55
-49
lines changed

hibernate-reactive-core/src/test/java/org/hibernate/reactive/CancelSignalTest.java

Lines changed: 55 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,19 @@
55
*/
66
package org.hibernate.reactive;
77

8+
import java.util.ArrayList;
89
import java.util.Collection;
910
import java.util.List;
1011
import java.util.Objects;
1112
import java.util.Queue;
1213
import java.util.concurrent.CompletableFuture;
14+
import java.util.concurrent.CompletionStage;
1315
import java.util.concurrent.ConcurrentLinkedQueue;
1416
import java.util.concurrent.CountDownLatch;
1517
import java.util.concurrent.ExecutorService;
1618
import java.util.concurrent.Executors;
17-
import java.util.stream.IntStream;
19+
20+
1821

1922
import org.junit.jupiter.api.Test;
2023

@@ -27,73 +30,76 @@
2730
import jakarta.persistence.Id;
2831
import jakarta.persistence.Table;
2932

30-
import static java.util.Arrays.stream;
3133
import static java.util.concurrent.CompletableFuture.allOf;
3234
import static java.util.concurrent.CompletableFuture.runAsync;
33-
import static java.util.stream.Stream.concat;
3435
import static org.assertj.core.api.Assertions.assertThat;
36+
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
3537

3638
public class CancelSignalTest extends BaseReactiveTest {
3739
private static final Logger LOG = Logger.getLogger( CancelSignalTest.class );
3840

41+
private static final int EXECUTION_SIZE = 10;
42+
3943
@Override
4044
protected Collection<Class<?>> annotatedEntities() {
4145
return List.of( GuineaPig.class );
4246
}
4347

48+
@Override
49+
public CompletionStage<Void> deleteEntities(Class<?>... entities) {
50+
// We don't need to delete anything
51+
return voidFuture();
52+
}
53+
4454
@Test
4555
public void cleanupConnectionWhenCancelSignal(VertxTestContext context) {
4656
// larger than 'sql pool size' to check entering the 'pool waiting queue'
47-
int executeSize = 10;
4857
CountDownLatch firstSessionWaiter = new CountDownLatch( 1 );
4958
Queue<Cancellable> cancellableQueue = new ConcurrentLinkedQueue<>();
5059

51-
ExecutorService withSessionExecutor = Executors.newFixedThreadPool( executeSize );
52-
// Create some jobs that are going to be cancelled asynchronously
53-
CompletableFuture[] withSessionFutures = IntStream
54-
.range( 0, executeSize )
55-
.mapToObj( i -> runAsync(
56-
() -> {
57-
CountDownLatch countDownLatch = new CountDownLatch( 1 );
58-
Cancellable cancellable = getMutinySessionFactory()
59-
.withSession( s -> {
60-
LOG.debug( "start withSession: " + i );
61-
sleep( 100 );
62-
firstSessionWaiter.countDown();
63-
return s.find( GuineaPig.class, 1 );
64-
} )
65-
.onTermination().invoke( () -> {
66-
countDownLatch.countDown();
67-
LOG.debug( "future " + i + " terminated" );
68-
} )
69-
.subscribe().with( item -> LOG.debug( "end withSession: " + i ) );
70-
cancellableQueue.add( cancellable );
71-
await( countDownLatch );
72-
},
73-
withSessionExecutor
74-
) )
75-
.toArray( CompletableFuture[]::new );
76-
77-
// Create jobs that are going to cancel the previous ones
78-
ExecutorService cancelExecutor = Executors.newFixedThreadPool( executeSize );
79-
CompletableFuture[] cancelFutures = IntStream
80-
.range( 0, executeSize )
81-
.mapToObj( i -> runAsync(
82-
() -> {
83-
await( firstSessionWaiter );
84-
cancellableQueue.poll().cancel();
85-
sleep( 500 );
86-
},
87-
cancelExecutor
88-
) )
89-
.toArray( CompletableFuture[]::new );
90-
91-
CompletableFuture<Void> allFutures = allOf( concat( stream( withSessionFutures ), stream( cancelFutures ) )
92-
.toArray( CompletableFuture[]::new )
93-
);
60+
final List<CompletableFuture<?>> allFutures = new ArrayList<>();
61+
62+
ExecutorService withSessionExecutor = Executors.newFixedThreadPool( EXECUTION_SIZE );
63+
for ( int j = 0; j < EXECUTION_SIZE; j++ ) {
64+
final int i = j;
65+
allFutures.add( runAsync( () -> {
66+
CountDownLatch countDownLatch = new CountDownLatch( 1 );
67+
Cancellable cancellable = getMutinySessionFactory()
68+
.withSession( s -> {
69+
LOG.info( "start withSession: " + i );
70+
sleep( 100 );
71+
firstSessionWaiter.countDown();
72+
return s.find( GuineaPig.class, 1 );
73+
} )
74+
.onCancellation().invoke( () -> {
75+
LOG.info( "future " + i + " cancelled" );
76+
countDownLatch.countDown();
77+
} )
78+
.subscribe()
79+
// We cancelled the job, it shouldn't really finish
80+
.with( item -> LOG.info( "end withSession: " + i ) );
81+
cancellableQueue.add( cancellable );
82+
await( countDownLatch );
83+
},
84+
withSessionExecutor
85+
) );
86+
}
9487

95-
// Test that there shouldn't be any pending process
96-
test( context, allFutures.thenAccept( x -> assertThat( sqlPendingMetric() ).isEqualTo( 0.0 ) ) );
88+
ExecutorService cancelExecutor = Executors.newFixedThreadPool( EXECUTION_SIZE );
89+
for ( int i = 0; i < EXECUTION_SIZE; i++ ) {
90+
allFutures.add( runAsync( () -> {
91+
await( firstSessionWaiter );
92+
cancellableQueue.poll().cancel();
93+
sleep( 500 );
94+
},
95+
cancelExecutor
96+
) );
97+
}
98+
99+
test(
100+
context, allOf( allFutures.toArray( new CompletableFuture<?>[0] ) )
101+
.thenAccept( x -> assertThat( sqlPendingMetric() ).isEqualTo( 0.0 ) )
102+
);
97103
}
98104

99105
private static double sqlPendingMetric() {

0 commit comments

Comments
 (0)