@@ -22,6 +22,7 @@ import ServiceLifecycle
2222/// `NIOAsyncSequenceProducerDelegate` that terminates the closes the producer when
2323/// `didTerminate()` is invoked.
2424internal struct KafkaConsumerCloseOnTerminate : Sendable {
25+ let isMessageSequence : Bool
2526 let stateMachine : NIOLockedValueBox < KafkaConsumer . StateMachine >
2627}
2728
@@ -31,7 +32,7 @@ extension KafkaConsumerCloseOnTerminate: NIOAsyncSequenceProducerDelegate {
3132 }
3233
3334 func didTerminate( ) {
34- self . stateMachine. withLockedValue { $0. messageSequenceTerminated ( ) }
35+ self . stateMachine. withLockedValue { $0. messageSequenceTerminated ( isMessageSequence : isMessageSequence ) }
3536 }
3637}
3738
@@ -121,6 +122,12 @@ public final class KafkaConsumer: Sendable, Service {
121122 NIOAsyncSequenceProducerBackPressureStrategies . NoBackPressure ,
122123 KafkaConsumerCloseOnTerminate
123124 >
125+ typealias ProducerEvents = NIOAsyncSequenceProducer <
126+ KafkaConsumerEvent ,
127+ NIOAsyncSequenceProducerBackPressureStrategies . NoBackPressure ,
128+ KafkaConsumerCloseOnTerminate
129+ >
130+
124131 /// The configuration object of the consumer client.
125132 private let config : KafkaConsumerConfiguration
126133 /// A logger.
@@ -146,7 +153,8 @@ public final class KafkaConsumer: Sendable, Service {
146153 client: RDKafkaClient ,
147154 stateMachine: NIOLockedValueBox < StateMachine > ,
148155 config: KafkaConsumerConfiguration ,
149- logger: Logger
156+ logger: Logger ,
157+ eventSource: ProducerEvents . Source ? = nil
150158 ) throws {
151159 self . config = config
152160 self . stateMachine = stateMachine
@@ -155,7 +163,7 @@ public final class KafkaConsumer: Sendable, Service {
155163 let sourceAndSequence = NIOThrowingAsyncSequenceProducer . makeSequence (
156164 elementType: KafkaConsumerMessage . self,
157165 backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies . NoBackPressure ( ) ,
158- delegate: KafkaConsumerCloseOnTerminate ( stateMachine: self . stateMachine)
166+ delegate: KafkaConsumerCloseOnTerminate ( isMessageSequence : true , stateMachine: self . stateMachine)
159167 )
160168
161169 self . messages = KafkaConsumerMessages (
@@ -166,7 +174,8 @@ public final class KafkaConsumer: Sendable, Service {
166174 self . stateMachine. withLockedValue {
167175 $0. initialize (
168176 client: client,
169- source: sourceAndSequence. source
177+ source: sourceAndSequence. source,
178+ eventSource: eventSource
170179 )
171180 }
172181
@@ -242,6 +251,11 @@ public final class KafkaConsumer: Sendable, Service {
242251 if config. enableAutoCommit == false {
243252 subscribedEvents. append ( . offsetCommit)
244253 }
254+ // Don't listen to statistics even if configured
255+ // As there are no events instantiated
256+ // if config.statisticsInterval != .zero {
257+ // subscribedEvents.append(.statistics)
258+ // }
245259
246260 let client = try RDKafkaClient . makeClient (
247261 type: . consumer,
@@ -250,20 +264,22 @@ public final class KafkaConsumer: Sendable, Service {
250264 logger: logger
251265 )
252266
253- let consumer = try KafkaConsumer (
254- client: client,
255- stateMachine: stateMachine,
256- config: config,
257- logger: logger
258- )
259-
260267 let sourceAndSequence = NIOAsyncSequenceProducer . makeSequence (
261268 elementType: KafkaConsumerEvent . self,
262269 backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies . NoBackPressure ( ) ,
263- delegate: KafkaConsumerCloseOnTerminate ( stateMachine: stateMachine)
270+ delegate: KafkaConsumerCloseOnTerminate ( isMessageSequence : false , stateMachine: stateMachine)
264271 )
265272
266273 let eventsSequence = KafkaConsumerEvents ( wrappedSequence: sourceAndSequence. sequence)
274+
275+ let consumer = try KafkaConsumer (
276+ client: client,
277+ stateMachine: stateMachine,
278+ config: config,
279+ logger: logger,
280+ eventSource: sourceAndSequence. source
281+ )
282+
267283 return ( consumer, eventsSequence)
268284 }
269285
@@ -321,7 +337,7 @@ public final class KafkaConsumer: Sendable, Service {
321337 while !Task. isCancelled {
322338 let nextAction = self . stateMachine. withLockedValue { $0. nextPollLoopAction ( ) }
323339 switch nextAction {
324- case . pollForAndYieldMessage( let client, let source) :
340+ case . pollForAndYieldMessage( let client, let source, let eventSource ) :
325341 let events = client. eventPoll ( )
326342 for event in events {
327343 switch event {
@@ -332,8 +348,11 @@ public final class KafkaConsumer: Sendable, Service {
332348 _ = source. yield ( message)
333349 case . failure( let error) :
334350 source. finish ( )
351+ eventSource? . finish ( )
335352 throw error
336353 }
354+ case . statistics( let statistics) :
355+ _ = eventSource? . yield ( . statistics( statistics) )
337356 default :
338357 break // Ignore
339358 }
@@ -383,8 +402,9 @@ public final class KafkaConsumer: Sendable, Service {
383402 client: client,
384403 logger: self . logger
385404 )
386- case . triggerGracefulShutdownAndFinishSource( let client, let source) :
405+ case . triggerGracefulShutdownAndFinishSource( let client, let source, let eventSource ) :
387406 source. finish ( )
407+ eventSource? . finish ( )
388408 self . _triggerGracefulShutdown (
389409 client: client,
390410 logger: self . logger
@@ -428,17 +448,20 @@ extension KafkaConsumer {
428448 ///
429449 /// - Parameter client: Client used for handling the connection to the Kafka cluster.
430450 /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
451+ /// - Parameter eventSource: ``NIOAsyncSequenceProducer/Source`` used for yielding new events.
431452 case initializing(
432453 client: RDKafkaClient ,
433- source: Producer . Source
454+ source: Producer . Source ,
455+ eventSource: ProducerEvents . Source ?
434456 )
435457 /// The ``KafkaConsumer`` is consuming messages.
436458 ///
437459 /// - Parameter client: Client used for handling the connection to the Kafka cluster.
438- /// - Parameter source : ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
460+ /// - Parameter eventSource : ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
439461 case consuming(
440462 client: RDKafkaClient ,
441- source: Producer . Source
463+ source: Producer . Source ,
464+ eventSource: ProducerEvents . Source ?
442465 )
443466 /// Consumer is still running but the messages asynchronous sequence was terminated.
444467 /// All incoming messages will be dropped.
@@ -461,14 +484,16 @@ extension KafkaConsumer {
461484 /// not yet available when the normal initialization occurs.
462485 mutating func initialize(
463486 client: RDKafkaClient ,
464- source: Producer . Source
487+ source: Producer . Source ,
488+ eventSource: ProducerEvents . Source ?
465489 ) {
466490 guard case . uninitialized = self . state else {
467491 fatalError ( " \( #function) can only be invoked in state .uninitialized, but was invoked in state \( self . state) " )
468492 }
469493 self . state = . initializing(
470494 client: client,
471- source: source
495+ source: source,
496+ eventSource: eventSource
472497 )
473498 }
474499
@@ -480,7 +505,8 @@ extension KafkaConsumer {
480505 /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
481506 case pollForAndYieldMessage(
482507 client: RDKafkaClient ,
483- source: Producer . Source
508+ source: Producer . Source ,
509+ eventSource: ProducerEvents . Source ?
484510 )
485511 /// The ``KafkaConsumer`` stopped consuming messages or
486512 /// is in the process of shutting down.
@@ -502,8 +528,8 @@ extension KafkaConsumer {
502528 fatalError ( " \( #function) invoked while still in state \( self . state) " )
503529 case . initializing:
504530 fatalError ( " Subscribe to consumer group / assign to topic partition pair before reading messages " )
505- case . consuming( let client, let source) :
506- return . pollForAndYieldMessage( client: client, source: source)
531+ case . consuming( let client, let source, let eventSource ) :
532+ return . pollForAndYieldMessage( client: client, source: source, eventSource : eventSource )
507533 case . consumptionStopped( let client) :
508534 return . pollWithoutYield( client: client)
509535 case . finishing( let client) :
@@ -532,10 +558,11 @@ extension KafkaConsumer {
532558 switch self . state {
533559 case . uninitialized:
534560 fatalError ( " \( #function) invoked while still in state \( self . state) " )
535- case . initializing( let client, let source) :
561+ case . initializing( let client, let source, let eventSource ) :
536562 self . state = . consuming(
537563 client: client,
538- source: source
564+ source: source,
565+ eventSource: eventSource
539566 )
540567 return . setUpConnection( client: client)
541568 case . consuming, . consumptionStopped, . finishing, . finished:
@@ -545,16 +572,30 @@ extension KafkaConsumer {
545572
546573 /// The messages asynchronous sequence was terminated.
547574 /// All incoming messages will be dropped.
548- mutating func messageSequenceTerminated( ) {
575+ mutating func messageSequenceTerminated( isMessageSequence : Bool ) {
549576 switch self . state {
550577 case . uninitialized:
551578 fatalError ( " \( #function) invoked while still in state \( self . state) " )
552579 case . initializing:
553580 fatalError ( " Call to \( #function) before setUpConnection() was invoked " )
554581 case . consumptionStopped:
555- fatalError ( " messageSequenceTerminated() must not be invoked more than once " )
556- case . consuming( let client, _) :
557- self . state = . consumptionStopped( client: client)
582+ if isMessageSequence {
583+ fatalError ( " messageSequenceTerminated() must not be invoked more than once " )
584+ }
585+ case . consuming( let client, let source, let eventSource) :
586+ // only move to stopping if messages sequence was finished
587+ if isMessageSequence {
588+ self . state = . consumptionStopped( client: client)
589+ // If message sequence is being terminated, it means class deinit is called
590+ // see `messages` field, it is last change to call finish for `eventSource`
591+ eventSource? . finish ( )
592+ }
593+ else {
594+ // Messages are still consuming, only event source was finished
595+ // Ok, probably, noone wants to listen to events,
596+ // though it might be very bad for rebalancing
597+ self . state = . consuming( client: client, source: source, eventSource: nil )
598+ }
558599 case . finishing, . finished:
559600 break
560601 }
@@ -576,7 +617,7 @@ extension KafkaConsumer {
576617 fatalError ( " Subscribe to consumer group / assign to topic partition pair before committing offsets " )
577618 case . consumptionStopped:
578619 fatalError ( " Cannot store offset when consumption has been stopped " )
579- case . consuming( let client, _) :
620+ case . consuming( let client, _, _ ) :
580621 return . storeOffset( client: client)
581622 case . finishing, . finished:
582623 fatalError ( " \( #function) invoked while still in state \( self . state) " )
@@ -607,7 +648,7 @@ extension KafkaConsumer {
607648 fatalError ( " Subscribe to consumer group / assign to topic partition pair before committing offsets " )
608649 case . consumptionStopped:
609650 fatalError ( " Cannot commit when consumption has been stopped " )
610- case . consuming( let client, _) :
651+ case . consuming( let client, _, _ ) :
611652 return . commitSync( client: client)
612653 case . finishing, . finished:
613654 return . throwClosedError
@@ -628,7 +669,8 @@ extension KafkaConsumer {
628669 /// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
629670 case triggerGracefulShutdownAndFinishSource(
630671 client: RDKafkaClient ,
631- source: Producer . Source
672+ source: Producer . Source ,
673+ eventSource: ProducerEvents . Source ?
632674 )
633675 }
634676
@@ -642,11 +684,12 @@ extension KafkaConsumer {
642684 fatalError ( " \( #function) invoked while still in state \( self . state) " )
643685 case . initializing:
644686 fatalError ( " subscribe() / assign() should have been invoked before \( #function) " )
645- case . consuming( let client, let source) :
687+ case . consuming( let client, let source, let eventSource ) :
646688 self . state = . finishing( client: client)
647689 return . triggerGracefulShutdownAndFinishSource(
648690 client: client,
649- source: source
691+ source: source,
692+ eventSource: eventSource
650693 )
651694 case . consumptionStopped( let client) :
652695 self . state = . finishing( client: client)
0 commit comments