diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index 99f334bd245101..0246a48374ec49 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -468,6 +468,7 @@ def get_stream_processor( shutdown_strategy_before_consumer: bool = False, add_global_tags: bool = False, profile_consumer_join: bool = False, + enable_autocommit: bool = False, ) -> StreamProcessor: from sentry.utils import kafka_config @@ -525,6 +526,7 @@ def build_consumer_config(group_id: str): group_id=group_id, auto_offset_reset=auto_offset_reset, strict_offset_reset=strict_offset_reset, + enable_auto_commit=enable_autocommit, ) if max_poll_interval_ms is not None: @@ -537,6 +539,10 @@ def build_consumer_config(group_id: str): if group_instance_id is not None: consumer_config["group.instance.id"] = group_instance_id + if enable_autocommit: + # Set commit interval to 1 second (1000ms) + consumer_config["auto.commit.interval.ms"] = 1000 + return consumer_config consumer: Consumer = KafkaConsumer(build_consumer_config(group_id)) diff --git a/src/sentry/runner/commands/run.py b/src/sentry/runner/commands/run.py index 48861f8e76dc5e..ff24c4c9f696d1 100644 --- a/src/sentry/runner/commands/run.py +++ b/src/sentry/runner/commands/run.py @@ -550,6 +550,12 @@ def taskbroker_send_tasks( default=False, help="Adds a ProcessingStrategy to the start of a consumer that records a transaction of the consumer's join() method.", ) +@click.option( + "--enable-autocommit", + is_flag=True, + default=False, + help="Enable Kafka autocommit mode with 1s commit interval. Offsets are stored via store_offsets and rdkafka commits them automatically.", +) @configuration def basic_consumer( consumer_name: str, @@ -557,6 +563,7 @@ def basic_consumer( topic: str | None, kafka_slice_id: int | None, quantized_rebalance_delay_secs: int | None, + enable_autocommit: bool, **options: Any, ) -> None: """ @@ -594,6 +601,7 @@ def basic_consumer( topic=topic, kafka_slice_id=kafka_slice_id, add_global_tags=True, + enable_autocommit=enable_autocommit, **options, )