【技术种草】CKafka调优笔记 消费堆积 服务CPU未跑满应该如何解决?
1. 背景
Proxy服务负责消费CKafka消息并解析,并分发消息至不同的CKafka topic。近期发现Proxy服务消费CKafka有消息堆积,且服务所在CVM CPU与内存资源大概只占用50%左右。
如图所示可以看到,在数据量峰值的的时候,生产流量可以达到2000MB/小时,但是消费流量达不到这么多,说明该服务有消息堆积。
其他说明:CKafka partition数量与服务实例数量正好一比一关系,CKafka 消费Client Concurrence设置为1。Proxy服务维护一个线程池,用于解析与分发消费的每一条消息。每当有消息进入服务时,每条消息会用一个线程进行解析消息并发送数据。
@KafkaListener(topics = "topic")
public void consumerKafkaMsg(List<ConsumerRecord<?, String>> records) throws Exception {
for (ConsumerRecord<?, String> record : records) {
log.debug("kafka topic = {}, value:\\n{}", record.topic(), record.value());
service.handleMsg(record);
}
}
@PostConstruct
public void init() {
BlockingQueue<Runnable> workingQueue = new ArrayBlockingQueue<Runnable>(consumerCount);
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("kafka-consumer-%d").build();
threadPool = new ThreadPoolExecutor(consumerCount, consumerCount, 0L, TimeUnit.MILLISECONDS,
workingQueue, namedThreadFactory, rejectedExecutionHandler);
}
public void handleMsg(ConsumerRecord<?, String> record) {
threadPool.execute(new ThreadPoolTask(recorde));
}
2. 问题分析
使用Arthas工具分析一下堆栈,如下图,可以看到每个线程都在TIMED_WAITING的等待状态,CPU消耗也很低,初步判断消费堆积并不是因为线程数量不够,而是卡在IO。
3.3 CKafka Producer 参数修改
同时重新查看Arthas里面每个线程的状态,线程卡在kafkaTemplate里面的dosent方法,再往上是awiat
腾讯云监控还是起了很大作用,在调优过程有很大参考意义,Ckafka或者组件都需要进行适当的参数调整才能发挥最大作用
效果还是比较明显可以看到机器CPU负载提高显著,未消费的Kafka消息也慢慢降低,达到预期。