1- package com .alibabacloud .mse .demo ;
1+ package com .alibabacloud .mse .demo . a . mq ;
22
3- import com .alibabacloud .mse .demo .service .MqConsumer ;
43import lombok .RequiredArgsConstructor ;
54import lombok .extern .slf4j .Slf4j ;
65import org .apache .rocketmq .client .consumer .DefaultMQPushConsumer ;
76import org .apache .rocketmq .client .exception .MQClientException ;
87import org .apache .rocketmq .common .consumer .ConsumeFromWhere ;
8+ import org .springframework .beans .factory .annotation .Autowired ;
9+ import org .springframework .beans .factory .annotation .Qualifier ;
910import org .springframework .beans .factory .annotation .Value ;
11+ import org .springframework .cloud .commons .util .InetUtils ;
1012import org .springframework .context .annotation .Bean ;
1113import org .springframework .context .annotation .Configuration ;
14+ import org .springframework .web .client .RestTemplate ;
1215
1316@ Slf4j
1417@ Configuration
@@ -24,7 +27,15 @@ public class RocketMqConfiguration {
2427 @ Value ("${rocketmq.consumer.topic}" )
2528 private String topic ;
2629
27- private final MqConsumer mqConsumer ;
30+ @ Autowired
31+ @ Qualifier ("loadBalancedRestTemplate" )
32+ private RestTemplate restTemplate ;
33+
34+ @ Autowired
35+ private InetUtils inetUtils ;
36+
37+ @ Autowired
38+ private String serviceTag ;
2839
2940 static {
3041 System .setProperty ("rocketmq.client.log.loadconfig" , "false" );
@@ -37,6 +48,12 @@ public DefaultMQPushConsumer mqPushConsumer() throws MQClientException {
3748 consumer .setNamesrvAddr (nameSrvAddr );
3849 consumer .subscribe (topic , "*" );
3950 consumer .setConsumeFromWhere (ConsumeFromWhere .CONSUME_FROM_FIRST_OFFSET );
51+
52+ MqConsumer mqConsumer = new MqConsumer (
53+ restTemplate ,
54+ inetUtils ,
55+ serviceTag
56+ );
4057 consumer .registerMessageListener (mqConsumer );
4158 log .info ("完成启动rocketMq的consumer,subscribe:{}" , topic );
4259 return consumer ;
0 commit comments