1616
1717package org .springframework .kafka .listener ;
1818
19+ import java .time .Duration ;
20+ import java .util .ArrayList ;
21+ import java .util .HashMap ;
22+ import java .util .List ;
23+ import java .util .Map ;
24+ import java .util .concurrent .CountDownLatch ;
25+ import java .util .concurrent .TimeUnit ;
26+
1927import org .apache .kafka .clients .consumer .Consumer ;
2028import org .apache .kafka .clients .consumer .ConsumerRecord ;
2129import org .apache .kafka .clients .consumer .ConsumerRecords ;
2230import org .apache .kafka .common .TopicPartition ;
31+
2332import org .junit .jupiter .api .Test ;
33+
2434import org .springframework .kafka .core .ConsumerFactory ;
2535import org .springframework .kafka .listener .adapter .FilteringMessageListenerAdapter ;
2636import org .springframework .kafka .listener .adapter .RecordFilterStrategy ;
2737
28- import java .time .Duration ;
29- import java .util .*;
30- import java .util .concurrent .CountDownLatch ;
31- import java .util .concurrent .TimeUnit ;
32-
3338import static org .assertj .core .api .Assertions .assertThat ;
3439import static org .mockito .ArgumentMatchers .any ;
3540import static org .mockito .ArgumentMatchers .anyMap ;
3641import static org .mockito .BDDMockito .given ;
37- import static org .mockito .Mockito .*;
42+
43+ import static org .mockito .Mockito .mock ;
44+ import static org .mockito .Mockito .verify ;
45+ import static org .mockito .Mockito .times ;
3846
3947/**
4048 * Tests to verify the behavior of RECORD acknowledge mode when used with filtering strategies.
4654 */
4755public class AckModeRecordWithFilteringTest {
4856
49- @ SuppressWarnings ("unchecked" )
57+ @ SuppressWarnings ({ "unchecked" , "deprecation" } )
5058 @ Test
5159 public void testCurrentRecordModeCommitsAllRecords () throws InterruptedException {
5260 // Given: A container with RECORD ack mode and a filter that filters out even offsets
@@ -89,7 +97,7 @@ public void testCurrentRecordModeCommitsAllRecords() throws InterruptedException
8997
9098 given (consumer .poll (any (Duration .class )))
9199 .willReturn (consumerRecords )
92- .willReturn (new ConsumerRecords <>( Collections . emptyMap () ));
100+ .willReturn (ConsumerRecords . empty ( ));
93101
94102 // When: Start the container and process records
95103 container .start ();
@@ -103,7 +111,7 @@ public void testCurrentRecordModeCommitsAllRecords() throws InterruptedException
103111 verify (consumer , times (4 )).commitSync (any (), any (Duration .class ));
104112 }
105113
106- @ SuppressWarnings ("unchecked" )
114+ @ SuppressWarnings ({ "unchecked" , "deprecation" } )
107115 @ Test
108116 public void testAllRecordsFilteredStillCommits () throws InterruptedException {
109117 // Given: A container where all records are filtered
@@ -139,7 +147,7 @@ public void testAllRecordsFilteredStillCommits() throws InterruptedException {
139147
140148 given (consumer .poll (any (Duration .class )))
141149 .willReturn (consumerRecords )
142- .willReturn (new ConsumerRecords <>( Collections . emptyMap () ));
150+ .willReturn (ConsumerRecords . empty ( ));
143151
144152 // When: Start the container
145153 container .start ();
@@ -151,7 +159,7 @@ public void testAllRecordsFilteredStillCommits() throws InterruptedException {
151159 verify (consumer , times (2 )).commitSync (any (), any (Duration .class ));
152160 }
153161
154- @ SuppressWarnings ("unchecked" )
162+ @ SuppressWarnings ({ "unchecked" , "deprecation" } )
155163 @ Test
156164 public void testMixedPartitionsWithFiltering () throws InterruptedException {
157165 // Given: Multiple partitions with different records
@@ -201,7 +209,7 @@ record -> record.value().contains("skip");
201209
202210 given (consumer .poll (any (Duration .class )))
203211 .willReturn (consumerRecords )
204- .willReturn (new ConsumerRecords <>( Collections . emptyMap () ));
212+ .willReturn (ConsumerRecords . empty ( ));
205213
206214 // When: Start container
207215 container .start ();
@@ -215,7 +223,7 @@ record -> record.value().contains("skip");
215223 verify (consumer , times (5 )).commitSync (any (), any (Duration .class ));
216224 }
217225
218- @ SuppressWarnings ("unchecked" )
226+ @ SuppressWarnings ({ "unchecked" , "deprecation" } )
219227 @ Test
220228 public void testCommitLogging () throws InterruptedException {
221229 ConsumerFactory <String , String > consumerFactory = mock (ConsumerFactory .class );
@@ -251,7 +259,7 @@ public void testCommitLogging() throws InterruptedException {
251259
252260 given (consumer .poll (any (Duration .class )))
253261 .willReturn (consumerRecords )
254- .willReturn (new ConsumerRecords <>( Collections . emptyMap () ));
262+ .willReturn (ConsumerRecords . empty ( ));
255263
256264 // When
257265 container .start ();
@@ -262,7 +270,7 @@ public void testCommitLogging() throws InterruptedException {
262270 verify (consumer , times (2 )).commitSync (anyMap (), any (Duration .class ));
263271 }
264272
265- @ SuppressWarnings ("unchecked" )
273+ @ SuppressWarnings ({ "unchecked" , "deprecation" } )
266274 @ Test
267275 public void testAckDiscardedParameterBehavior () throws InterruptedException {
268276 ConsumerFactory <String , String > consumerFactory = mock (ConsumerFactory .class );
@@ -303,7 +311,7 @@ public void testAckDiscardedParameterBehavior() throws InterruptedException {
303311
304312 given (consumer .poll (any (Duration .class )))
305313 .willReturn (consumerRecords )
306- .willReturn (new ConsumerRecords <>( Collections . emptyMap () ));
314+ .willReturn (ConsumerRecords . empty ( ));
307315
308316 container .start ();
309317 assertThat (processedLatch .await (5 , TimeUnit .SECONDS )).isTrue ();
0 commit comments