@@ -35,6 +35,29 @@ extension KafkaConsumerCloseOnTerminate: NIOAsyncSequenceProducerDelegate {
3535 }
3636}
3737
38+ // MARK: - KafkaConsumerEvents
39+
40+ /// `AsyncSequence` implementation for handling ``KafkaConsumerEvent``s emitted by Kafka.
41+ public struct KafkaConsumerEvents : Sendable , AsyncSequence {
42+ public typealias Element = KafkaConsumerEvent
43+ typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies . NoBackPressure
44+ typealias WrappedSequence = NIOAsyncSequenceProducer < Element , BackPressureStrategy , KafkaConsumerCloseOnTerminate >
45+ let wrappedSequence : WrappedSequence
46+
47+ /// `AsynceIteratorProtocol` implementation for handling ``KafkaConsumerEvent``s emitted by Kafka.
48+ public struct AsyncIterator : AsyncIteratorProtocol {
49+ var wrappedIterator : WrappedSequence . AsyncIterator
50+
51+ public mutating func next( ) async -> Element ? {
52+ await self . wrappedIterator. next ( )
53+ }
54+ }
55+
56+ public func makeAsyncIterator( ) -> AsyncIterator {
57+ return AsyncIterator ( wrappedIterator: self . wrappedSequence. makeAsyncIterator ( ) )
58+ }
59+ }
60+
3861// MARK: - KafkaConsumerMessages
3962
4063/// `AsyncSequence` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``).
@@ -52,7 +75,7 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
5275 let wrappedSequence : WrappedSequence
5376
5477 /// `AsynceIteratorProtocol` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``).
55- public struct ConsumerMessagesAsyncIterator : AsyncIteratorProtocol {
78+ public struct AsyncIterator : AsyncIteratorProtocol {
5679 let stateMachine : NIOLockedValueBox < KafkaConsumer . StateMachine >
5780 var wrappedIterator : WrappedSequence . AsyncIterator ?
5881
@@ -80,8 +103,8 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
80103 }
81104 }
82105
83- public func makeAsyncIterator( ) -> ConsumerMessagesAsyncIterator {
84- return ConsumerMessagesAsyncIterator (
106+ public func makeAsyncIterator( ) -> AsyncIterator {
107+ return AsyncIterator (
85108 stateMachine: self . stateMachine,
86109 wrappedIterator: self . wrappedSequence. makeAsyncIterator ( )
87110 )
@@ -108,28 +131,27 @@ public final class KafkaConsumer: Sendable, Service {
108131 /// `AsyncSequence` that returns all ``KafkaConsumerMessage`` objects that the consumer receives.
109132 public let messages : KafkaConsumerMessages
110133
134+ // Private initializer, use factory method or convenience init to create KafkaConsumer
111135 /// Initialize a new ``KafkaConsumer``.
112136 /// To listen to incoming messages, please subscribe to a list of topics using ``subscribe(topics:)``
113137 /// or assign the consumer to a particular topic + partition pair using ``assign(topic:partition:offset:)``.
114- /// - Parameter config: The ``KafkaConsumerConfiguration`` for configuring the ``KafkaConsumer``.
115- /// - Parameter logger: A logger.
138+ ///
139+ /// - Parameters:
140+ /// - client: Client used for handling the connection to the Kafka cluster.
141+ /// - stateMachine: The state machine containing the state of the ``KafkaConsumer``.
142+ /// - config: The ``KafkaConsumerConfiguration`` for configuring the ``KafkaConsumer``.
143+ /// - logger: A logger.
116144 /// - Throws: A ``KafkaError`` if the initialization failed.
117- public init (
145+ private init (
146+ client: RDKafkaClient ,
147+ stateMachine: NIOLockedValueBox < StateMachine > ,
118148 config: KafkaConsumerConfiguration ,
119149 logger: Logger
120150 ) throws {
121151 self . config = config
152+ self . stateMachine = stateMachine
122153 self . logger = logger
123154
124- let client = try RDKafkaClient . makeClient (
125- type: . consumer,
126- configDictionary: config. dictionary,
127- events: [ . log, . fetch, . offsetCommit] ,
128- logger: logger
129- )
130-
131- self . stateMachine = NIOLockedValueBox ( StateMachine ( logger: self . logger) )
132-
133155 let sourceAndSequence = NIOThrowingAsyncSequenceProducer . makeSequence (
134156 elementType: KafkaConsumerMessage . self,
135157 backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies . NoBackPressure ( ) ,
@@ -160,6 +182,91 @@ public final class KafkaConsumer: Sendable, Service {
160182 }
161183 }
162184
185+ /// Initialize a new ``KafkaConsumer``.
186+ ///
187+ /// This creates a consumer without that does not listen to any events other than consumer messages.
188+ ///
189+ /// - Parameters:
190+ /// - config: The ``KafkaConsumerConfiguration`` for configuring the ``KafkaConsumer``.
191+ /// - logger: A logger.
192+ /// - Returns: The newly created ``KafkaConsumer``.
193+ /// - Throws: A ``KafkaError`` if the initialization failed.
194+ public convenience init (
195+ config: KafkaConsumerConfiguration ,
196+ logger: Logger
197+ ) throws {
198+ let stateMachine = NIOLockedValueBox ( StateMachine ( logger: logger) )
199+
200+ var subscribedEvents : [ RDKafkaEvent ] = [ . log, . fetch]
201+ // Only listen to offset commit events when autoCommit is false
202+ if config. enableAutoCommit == false {
203+ subscribedEvents. append ( . offsetCommit)
204+ }
205+
206+ let client = try RDKafkaClient . makeClient (
207+ type: . consumer,
208+ configDictionary: config. dictionary,
209+ events: subscribedEvents,
210+ logger: logger
211+ )
212+
213+ try self . init (
214+ client: client,
215+ stateMachine: stateMachine,
216+ config: config,
217+ logger: logger
218+ )
219+ }
220+
221+ /// Initialize a new ``KafkaConsumer`` and a ``KafkaConsumerEvents`` asynchronous sequence.
222+ ///
223+ /// Use the asynchronous sequence to consume events.
224+ ///
225+ /// - Important: When the asynchronous sequence is deinited the producer will be shutdown and disallow sending more messages.
226+ /// Additionally, make sure to consume the asynchronous sequence otherwise the events will be buffered in memory indefinitely.
227+ ///
228+ /// - Parameters:
229+ /// - config: The ``KafkaConsumerConfiguration`` for configuring the ``KafkaConsumer``.
230+ /// - logger: A logger.
231+ /// - Returns: A tuple containing the created ``KafkaConsumer`` and the ``KafkaConsumerEvents``
232+ /// `AsyncSequence` used for receiving message events.
233+ /// - Throws: A ``KafkaError`` if the initialization failed.
234+ public static func makeConsumerWithEvents(
235+ config: KafkaConsumerConfiguration ,
236+ logger: Logger
237+ ) throws -> ( KafkaConsumer , KafkaConsumerEvents ) {
238+ let stateMachine = NIOLockedValueBox ( StateMachine ( logger: logger) )
239+
240+ var subscribedEvents : [ RDKafkaEvent ] = [ . log, . fetch]
241+ // Only listen to offset commit events when autoCommit is false
242+ if config. enableAutoCommit == false {
243+ subscribedEvents. append ( . offsetCommit)
244+ }
245+
246+ let client = try RDKafkaClient . makeClient (
247+ type: . consumer,
248+ configDictionary: config. dictionary,
249+ events: subscribedEvents,
250+ logger: logger
251+ )
252+
253+ let consumer = try KafkaConsumer (
254+ client: client,
255+ stateMachine: stateMachine,
256+ config: config,
257+ logger: logger
258+ )
259+
260+ let sourceAndSequence = NIOAsyncSequenceProducer . makeSequence (
261+ elementType: KafkaConsumerEvent . self,
262+ backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies . NoBackPressure ( ) ,
263+ delegate: KafkaConsumerCloseOnTerminate ( stateMachine: stateMachine)
264+ )
265+
266+ let eventsSequence = KafkaConsumerEvents ( wrappedSequence: sourceAndSequence. sequence)
267+ return ( consumer, eventsSequence)
268+ }
269+
163270 /// Subscribe to the given list of `topics`.
164271 /// The partition assignment happens automatically using `KafkaConsumer`'s consumer group.
165272 /// - Parameter topics: An array of topic names to subscribe to.
0 commit comments