【Kafka】使用Wireshark抓包分析Kafka通信协议
Wireshark
什么是Wireshark?
Wireshark (前身 Ethereal)是一个网络封包分析软件。网络封包分析软件的功能是撷取网络封包,并尽可能显示出最为详细的网络封包资料。
是目前全球使用最广泛的开源抓包软件,其前身为Ethereal,是一个通用的网络数据嗅探器和协议分析器,由Gerald Combs编写并于1998年以GPL开源许可证发布。如果是网络工程师,可以通过Wireshark对网络进行 故障定位和排错; 如果安全工程师,可以通过Wireshark对网络 黑客渗透攻击进行快速定位并找出攻击源; 如果是测试或软件工程师,可以通过Wireshark 分析底层通讯机制等 。
界面介绍
打开WireShark,整个界面分为两部分——工具栏和窗格
最上面是工具栏,包含两部分
- 主工具栏:提供从菜单快速访问常用项目的功能,该工具栏不能由用户自定义
- “Filter” 工具栏:可以快速编辑和应用显示过滤器。
窗格从上到下总共有3块区域
Packet List窗格:显示当前捕获文件中的所有数据包
Packet Details窗格:数据包详细信息窗格以更详细的形式显示当前数据包
Packet Bytes窗格:数据包字节窗格以十六进制转储样式显示当前数据包的数据
使用显示过滤器
Wireshark 提供了一种显示过滤器语言,可以精确地控制显示哪些数据包。它们可用于检查协议或字段的存在,字段的值,甚至可以将两个字段相互比较。
显示过滤器字段
最简单的显示过滤器是显示单个协议的过滤器。要仅显示包含特定协议的数据包,请在 Wireshark 的显示过滤器工具栏中键入该协议。
例如,要仅显示 Kafka 数据包,请在 Wireshark 的显示过滤器工具栏中键入 kafka.
。
Wireshark内置支持的协议类型非常多,可以参考: https://www.wireshark.org/docs/dfref/
Wireshark支持的Kafka协议字段可参考此链接: https://www.wireshark.org/docs/dfref/k/kafka.html
比较值
可以使用多个不同的比较运算符来构建用于比较值的显示过滤器。
例如,要仅显示去往或来自 IP 地址 192.168.0.1 的数据包,请使用 ip.addr==192.168.0.1。
列出了可用的比较运算符的完整列表
Kafka通信协议
Kafka的Producer、Broker和Consumer之间采用的是一套自行设计的基于TCP层的协议。Kafka的这套协议完全是为了Kafka自身的业务需求而定制的,协议定义了所有 API 的请求及响应消息。
概述
Kafka 协议是相当简单的,只有六个核心客户端请求 API:
- 元数据(Metadata) – 描述当前可用的 brokers,brokers 的主机和端口信息,并提供了哪个 broker 托管了哪些分区的信息;
- 发送(Send) – 发送消息到 broker;
- 获取(Fetch) – 从 broker 上获取消息。主要分三类:一个用于获取数据,一个用于获取集群的元数据,还有一个用于获取 topic 的偏移量信息;
- 偏移量(Offsets) – 获取给定 topic 分区的可用偏移量信息;
- 提交偏移量(Offset Commit) – 提交消费者组(Consumer Group)的一组偏移量;
- 获取偏移量(Offset Fetch) – 为消费者组获取一组偏移量
此外,从 0.9 版本开始,Kafka 支持为消费者和 Kafka 连接进行分组管理。客户端 API 包括五个请求:
- 分组协调者(GroupCoordinator) – 用来定位一个分组的当前协调者。
- 加入分组(JoinGroup) – 成为某个分组的成员,当分组不存在(没有一个成员时)则创建分组。
- 同步分组(SyncGroup) – 同步分组中所有成员的状态(例如分发分区分配信息(Partition Assignments)到各个组员)。
- 心跳(Heartbeat) – 保持组内成员的活跃状态。
- 离开分组(LeaveGroup) – 直接离开一个组。
最后,有几个管理 API,可用于监控/管理 Kafka 集群:
- 描述消费者组(DescribeGroups) – 用于检查一组群体的当前状态(如:查看消费者分区分配)。
- 列出组(ListGroups) – 列出某个 broker 当前管理的所有组
这里不针对性细讲,完整的协议介绍可以参考:
版本和兼容性
协议的目的要达到在向后兼容的基础上渐进演化。版本是基于每个API基础之上,每个版本包括一个请求和响应对。每个请求包含API Key,里面包含了被调用的API标识,以及表示这些请求和响应格式的版本号。
0.9.0.1
Kafka集群支持如下ApiKey的请求
PRODUCE(0, "Produce"),
FETCH(1, "Fetch"),
LIST_OFFSETS(2, "Offsets"),
METADATA(3, "Metadata"),
LEADER_AND_ISR(4, "LeaderAndIsr"),
STOP_REPLICA(5, "StopReplica"),
UPDATE_METADATA_KEY(6, "UpdateMetadata"),
CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),
OFFSET_COMMIT(8, "OffsetCommit"),
OFFSET_FETCH(9, "OffsetFetch"),
GROUP_COORDINATOR(10, "GroupCoordinator"),
JOIN_GROUP(11, "JoinGroup"),
HEARTBEAT(12, "Heartbeat"),
LEAVE_GROUP(13, "LeaveGroup"),
SYNC_GROUP(14, "SyncGroup"),
DESCRIBE_GROUPS(15, "DescribeGroups"),
LIST_GROUPS(16, "ListGroups");
服务器会拒绝它不支持的版本的请求,并始终返回它期望收到的能够完成请求响应的版本的协议格式。
通用的请求和响应格式
所有请求和响应都从以下语法为基础
RequestOrResponse => Size (RequestMessage | ResponseMessage) Size => int32
域 |
描述 |
---|---|
MessageSize |
MessageSize 域给出了后续请求或响应消息的字节(bytes)长度。客户端可以先读取4字节的长度N,然后读取并解析后续的N字节请求内容 |
请求(Requests)
RequestMessage => ApiKey ApiVersion CorrelationId ClientId RequestMessage
ApiKey => int16
ApiVersion => int16
CorrelationId => int32
ClientId => string
RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest
域 |
描述 |
---|---|
ApiKey |
这是一个表示所调用的API的数字id(即它表示是一个元数据请求?生产请求?获取请求等) |
ApiVersion |
这是该API的一个数字版本号。我们为每个API定义一个版本号,该版本号允许服务器根据版本号正确地解释请求内容。响应消息也始终对应于所述请求的版本的格式 |
CorrelationId |
这是一个用户提供的整数。它将会被服务器原封不动地回传给客户端。用于匹配客户机和服务器之间的请求和响应 |
ClientId |
这是为客户端应用程序的自定义的标识。用户可以使用他们喜欢的任何标识符,他们会被用在记录错误时,监测统计信息等场景。例如,你可能不仅想要监视每秒的总体请求,还要根据客户端应用程序进行监视,那它就可以被用上(其中每一个都将驻留在多个服务器上)。这个ID作为特定的客户端对所有的请求的逻辑分组 |
响应(Responses)
Response => CorrelationId ResponseMessage
CorrelationId => int32
ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse
域 |
描述 |
---|---|
CorrelationId |
服务器传回给客户端它所提供用作关联请求和响应消息的整数 |
所有响应都是与请求成对匹配(例如,发送回一个元数据请求,就会得到一个元数据响应)。
案例
kafka高版本Client连接0.9Serve
有高版本客户端连接0.9 Kafka集群时会出现生产和写入问题。
查看服务端日志如下:
看到java.lang.ArrayIndexOutOfBoundsException: 18
这个关键字报错,可以明确有apikey=18的请求访问0.9集群,从前面可以知道0.9集群ApiKey最大支持到16,当前要找出是哪个任务用高版本客户端访问该0.9集群
使用tcpdump+Wireshark抓包分析
tcpdump抓包
在服务端,根据kafka所使用9092端口抓包
tcpdump -i any -nn -vv tcp port 9092 -s 0 -w kafka.cap
wireshark分析
wireshark可能未能自动识别出kafka协议。首先检查一下Wireshark是否支持kafka协议解析。
出现以上信息说明wireshark支持Kafka协议,如果没有的话,更新wireshark最新版即可。当前笔者使用的是Version 3.4.5
接下来点选中一条数据消息右键,点击“Decode As”,在弹出窗口的“当前”下拉列表中选择“kafka”,然后点击“OK”。
可以看到除了tcp控制报文外,其他报文都被解析成kafka协议(如解析不出来,可尝试退出wireshark重新打开)。
Decode As临时设置解码器,退出Wireshark以后,这些设置会丢失
在“Filter” 工具栏中输入kafka.api\\_key == 18
搜索apikey=18的请求来自哪个ip和端口
根据来源IP找到是实时计算集群,结合作业发布平台找出对应时间段可能的任务一一核实,找开发确认后将任务停掉恢复。
在案例中,之前处理方案是Kafka开启Trace日志重启,根据日志的最近的报错IP来猜测,具有一定的随机性,使用Wireshark工具分析可以又快又准的找出来。
查看Fetch请求或响应报文的详细字段
Kafka Fetch Request
可以看到向两个Partition 分区53和13请求消息,53分区请求offset是3605043491,Max Bytes是1MB。
Kafka Fetch Response
可以看到返回两个Partition 分区53和13的消息,53返回的是offset是3605043491,消息大小是2981B
总结
Wireshark在支持协议的数量方面是出类拔萃的,目前已提供了超过上千种协议的支持。这些协议包括从最基础的IP协议和DHCP协议到高级的专用协议比如Appletalk和Bittorrente等。
Wireshark从1.12.0版本开始支持Kafka通信协议,到现在最新的3.4.5更完善支持协议。通过Wireshark分析学习Kafka通信协议加深对Kafka的理解和问题处理。
由于Wireshark在开源模式下进行开发,每次更新都会增加一些对新协议的支持。后续鲲鹏运维将考虑对Pulsar协议的支持调研。
参考资料: